A mutable reduction operation (takes a sequence of input elements and combines them into a single summary result) that accumulates input elements into a mutable result container, optionally transforming the accumulated result into a final representation after all input elements have been processed. Reduction operations can be performed either sequentially or in parallel.
Interface Collector<T, A, R>
Type Parameters:
T - the type of input elements to the reduction operation
A - the mutable accumulation type of the reduction operation (often hidden as an implementation detail)
R - the result type of the reduction operation
A Collector is specified by four functions that work together to accumulate entries into a mutable result container, and optionally perform a final transform on the result. They are:
creation of a new result container (supplier())
incorporating a new data element into a result container (accumulator())
combining two result containers into one (combiner())
performing an optional final transform on the container (finisher())
Collectors also have a set of characteristics, such as Collector.Characteristics.CONCURRENT, that provide hints that can be used by a reduction implementation to provide better performance.
Every collector must implement the following methods
interface Collector<T, A, R> {
Supplier<A> supplier ()
BiConsumer<A, T> acumulator ()
BinaryOperator<A> combiner ()
Function<A, R> finisher ()
Set<Characteristics> characteristics ()
}
Sequential Processing- collector create a single result container using the supplier function and invoke the accumulator function once for each input element.
Parallel Processing- collector create a result container for each partition, accumulate the contents of each partition into a sub result for that partition, and then use the combiner function to merge the sub results into a combined result.
Ways to create custom collector
1. Custom collector
public class EmployeeApp {
public static void main(String[] args) {
List<Employee> employees = getEmployees();
//creation of a new result container
Supplier<StringBuilder> supplier = () -> new StringBuilder();
//incorporating a new data element into a result
BiConsumer<StringBuilder, Employee> accumulator = (builder, employee) -> builder.append(employee.getName()).append("|");
//combining two result containers into one
BinaryOperator<StringBuilder> combiner = (builder1, builder2) -> {
builder1.append(builder2.toString());
return builder1;
};
//performing an optional final transform on the container
Function<StringBuilder, String> finisher = builder -> builder.toString();
Collector<Employee, StringBuilder, String> collector = Collector.of(supplier, accumulator, combiner, finisher);
String nameJoiner = employees.stream().collect(collector);
System.out.println(nameJoiner);
}
private static List<Employee> getEmployees() {
Employee e1 = new Employee("Riyan", "IND", 2000.50, 30);
Employee e2 = new Employee("Sonee", "IND", 7000.50, 40);
Employee e3 = new Employee("Ganesh", "IND", 3000.20, 20);
Employee e4 = new Employee("John", "JPN", 2000.40, 25);
Employee e5 = new Employee("James", "US", 1000.50, 28);
return List.of(e1, e2, e3, e4, e5);
}
@AllArgsConstructor @Getter @Setter @ToString
private static class Employee {
private String name;
private String country;
private double salary;
private int age;
}
}
Output: Riyan|Sonee|Ganesh|John|James|
2. Custom collector implementing using Collector interface
@AllArgsConstructor @Getter @Setter @ToString
public class Product {
private String name;
private BigDecimal price;
private int quantity;
}
public class ProductMergerCollector implements Collector<Product, Map<String, Product>, Map<String, Product>> {
@Override
public Supplier<Map<String, Product>> supplier() {
return () -> new HashMap<>();
}
@Override
public BiConsumer<Map<String, Product>, Product> accumulator() {
return (container, product) -> {
String key = product.getName();
Product oldProduct = container.get(key);
if (oldProduct != null) {
product.setPrice(product.getPrice().add(oldProduct.getPrice()));
product.setQuantity(product.getQuantity() + oldProduct.getQuantity());
}
container.put(key, product);
};
}
@Override
public BinaryOperator<Map<String, Product>> combiner() {
return (container1, container2) -> {
container1.putAll(container2);
return container1;
};
}
@Override
public Function<Map<String, Product>, Map<String, Product>> finisher() {
return container -> container;
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH);
}
}
public class ProductApp {
public static void main(String[] args) {
Product p1 = new Product("P1", new BigDecimal("2000"), 20);
Product p2 = new Product("P2", new BigDecimal("3000"), 30);
Product p3 = new Product("P3", new BigDecimal("2500"), 10);
Product p4 = new Product("P1", new BigDecimal("2000"), 20);
List<Product> balances = List.of(p1, p2, p3, p4);
Map<String, Product> productGroupByName = balances.stream().collect(new ProductMergerCollector());
productGroupByName.forEach((key, value) -> {
System.out.println(key + " ---> " + value);
});
}
}
Output:
Interface Collector<T, A, R>
Type Parameters:
T - the type of input elements to the reduction operation
A - the mutable accumulation type of the reduction operation (often hidden as an implementation detail)
R - the result type of the reduction operation
A Collector is specified by four functions that work together to accumulate entries into a mutable result container, and optionally perform a final transform on the result. They are:
creation of a new result container (supplier())
incorporating a new data element into a result container (accumulator())
combining two result containers into one (combiner())
performing an optional final transform on the container (finisher())
Collectors also have a set of characteristics, such as Collector.Characteristics.CONCURRENT, that provide hints that can be used by a reduction implementation to provide better performance.
Every collector must implement the following methods
interface Collector<T, A, R> {
Supplier<A> supplier ()
BiConsumer<A, T> acumulator ()
BinaryOperator<A> combiner ()
Function<A, R> finisher ()
Set<Characteristics> characteristics ()
}
Sequential Processing- collector create a single result container using the supplier function and invoke the accumulator function once for each input element.
Parallel Processing- collector create a result container for each partition, accumulate the contents of each partition into a sub result for that partition, and then use the combiner function to merge the sub results into a combined result.
Ways to create custom collector
1. Custom collector
public class EmployeeApp {
public static void main(String[] args) {
List<Employee> employees = getEmployees();
//creation of a new result container
Supplier<StringBuilder> supplier = () -> new StringBuilder();
//incorporating a new data element into a result
BiConsumer<StringBuilder, Employee> accumulator = (builder, employee) -> builder.append(employee.getName()).append("|");
//combining two result containers into one
BinaryOperator<StringBuilder> combiner = (builder1, builder2) -> {
builder1.append(builder2.toString());
return builder1;
};
//performing an optional final transform on the container
Function<StringBuilder, String> finisher = builder -> builder.toString();
Collector<Employee, StringBuilder, String> collector = Collector.of(supplier, accumulator, combiner, finisher);
String nameJoiner = employees.stream().collect(collector);
System.out.println(nameJoiner);
}
private static List<Employee> getEmployees() {
Employee e1 = new Employee("Riyan", "IND", 2000.50, 30);
Employee e2 = new Employee("Sonee", "IND", 7000.50, 40);
Employee e3 = new Employee("Ganesh", "IND", 3000.20, 20);
Employee e4 = new Employee("John", "JPN", 2000.40, 25);
Employee e5 = new Employee("James", "US", 1000.50, 28);
return List.of(e1, e2, e3, e4, e5);
}
@AllArgsConstructor @Getter @Setter @ToString
private static class Employee {
private String name;
private String country;
private double salary;
private int age;
}
}
Output: Riyan|Sonee|Ganesh|John|James|
2. Custom collector implementing using Collector interface
@AllArgsConstructor @Getter @Setter @ToString
public class Product {
private String name;
private BigDecimal price;
private int quantity;
}
public class ProductMergerCollector implements Collector<Product, Map<String, Product>, Map<String, Product>> {
@Override
public Supplier<Map<String, Product>> supplier() {
return () -> new HashMap<>();
}
@Override
public BiConsumer<Map<String, Product>, Product> accumulator() {
return (container, product) -> {
String key = product.getName();
Product oldProduct = container.get(key);
if (oldProduct != null) {
product.setPrice(product.getPrice().add(oldProduct.getPrice()));
product.setQuantity(product.getQuantity() + oldProduct.getQuantity());
}
container.put(key, product);
};
}
@Override
public BinaryOperator<Map<String, Product>> combiner() {
return (container1, container2) -> {
container1.putAll(container2);
return container1;
};
}
@Override
public Function<Map<String, Product>, Map<String, Product>> finisher() {
return container -> container;
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH);
}
}
public class ProductApp {
public static void main(String[] args) {
Product p1 = new Product("P1", new BigDecimal("2000"), 20);
Product p2 = new Product("P2", new BigDecimal("3000"), 30);
Product p3 = new Product("P3", new BigDecimal("2500"), 10);
Product p4 = new Product("P1", new BigDecimal("2000"), 20);
List<Product> balances = List.of(p1, p2, p3, p4);
Map<String, Product> productGroupByName = balances.stream().collect(new ProductMergerCollector());
productGroupByName.forEach((key, value) -> {
System.out.println(key + " ---> " + value);
});
}
}
Output:
P1 ---> Product(name=P1, price=4000, quantity=40)
P2 ---> Product(name=P2, price=3000, quantity=30)
P3 ---> Product(name=P3, price=2500, quantity=10)
Comments
Post a Comment