Java 8 Streams

Это руководство дает не только общее представление о Stream в Java 8, но и полезные знания которые будут востребованы в будущем. Когда я впервые узнал о Stream API, я был удивлен таким названием, поскольку это звучит очень похоже на InputStream или OutputStream из Java I/O. Но Java 8 Stream это совсем другое. Streams (стримы/потоки) являются Монадами, которые играют большую роль в обеспечении функционального программирования в Java.

В функциональном программировании монада является структурой, которая представляет вычисления в виде последовательности шагов.

Это руководство научит вас основам работы с Java 8 Streams, а также научит использовать операции над потоками Streams. Вы узнаете о порядке обработки и о том, как упорядочение потока операций влияет на производительность во время выполнения. Более мощные операции Stream API: reduce, collect и flatMap также будут подробно описаны. Руководство заканчивается углубленным изучением параллельных потоков.

Как потоки работают?

Поток представляет собой последовательность элементов и поддерживает различные виды операций для выполнения вычислений:

List<String> mList = Arrays.asList("aa1","cc2", "cc1", "aa2", "bb1");
 
mList
    .stream()
    .filter(s -> s.startsWith("a"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);
//Результат выполнения: 
// AA1
// AA2

Операции над потоком относятся либо к промежуточным, либо к терминальным. Все промежуточные операции возвращают поток, так что мы можем объединять несколько промежуточных операций без использования точки с запятой. Терминальные операции возвращают void или непотоковый результат. В приведенном выше примере filter, map и sorted являются промежуточными операциями, тогда как forEach является терминальной операцией. Для получения полного списка всех доступных операций потока смотрите Javadoc по Stream.
Большинство операций потока принимают в качестве параметров какие-то лямбда-выражения, в функциональный интерфейс точное поведение по каждой операции. Большинство этих операций должны быть как неинтерферирующими (non-interfering), так и лишенными состояния (stateless). Что это значит?
Неинтерферирующуя функция не изменяет основной источник данных потока. Например, в приведенном выше примере лямбда выражение не изменяет mList путем добавления или удаления элементов из коллекции.
Лишенная состояния функция — выполнение операции является детерминированным, например, в приведенном выше примере лямбда-выражение не зависит от какой-либо изменяемой переменной или состояния из внешней среды, которая могла бы измениться во время выполнения.

Различные виды потоков (Streams)

Потоки могут быть созданы из различных источников данных, особенно коллекций. Коллекции List и Set поддерживают новые методы stream() и parallelStream(). Параллельные потоки способны работать на нескольких нитях и будут рассмотрены в следующем разделе этого руководства. Мы ориентируемся на последовательные потоки:

Arrays.asList("сс1", "сс2", "сс3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  
//Результат выполнения: сс1

Вызов метода stream() по перечню объектов возвращает поток. Но мы не должны создавать коллекции для того, чтобы работать с потоками. Смотрим на следующем примере кода:

Stream.of("сс1", "сс2", "сс3")
    .findFirst()
    .ifPresent(System.out::println);  
//Результатом будет сс1

Просто используйте Stream.of(), чтобы создать поток с кучей ссылок на объекты.

Кроме регулярных объектов потоков Java 8, еще предоставляются особые виды потоков для работы с примитивными типами данных intlong и double. Как вы уже догадались, это IntStream , LongStream и DoubleStream .

IntStreams может заменить обычный цикл for на изящный IntStream.range() :

IntStream.range(8, 12)
    .forEach(System.out::println);
//Результат выполнения:
// 8
// 9
// 10
// 11

Все эти примитивные потоки работают так же, как и обычные объектные потоки, но со следующими отличиями:
Примитивные потоки используют специализированные лямбда-выражения, например IntFunction Function или IntPredicate вместо Predicate . И примитивные потоки поддерживают дополнительные терминальные операции sum() и average():

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  
 //Результатом выполнения будет 5.0

Иногда это полезно преобразовать поток объекта к примитивному потоку или наоборот. Для этой цели потоки объектов поддерживают специальные операций картирования mapToInt()mapToLong() и mapToDouble() :

Stream.of("c1", "c2", "c3")
    .map(s -> s.substring(1))
    .mapToInt(Integer::parseInt)
    .max()
    .ifPresent(System.out::println);  
//Результатом выполнения будет 3

Примитивные потоки могут быть преобразованы в объектные потоки с помощью метода mapToObj():

IntStream.range(1, 4)    .mapToObj(i -> “с” + i)    .forEach(System.out::println);// Результат выполнения операции:// с1// с2// с3

Вот комбинированный пример: поток с double сначала преобразуется в int’овый поток и затем в объектный поток строк:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "с" + i)
    .forEach(System.out::println);
// Результат выполнения операции:
// с1
// с2
// с3

Порядок обработки

Теперь, когда мы научились создавать и работать с различными видами потоков, давайте углубляться в то, как поток операций обрабатываются под капотом.

Посмотрите на этот образец, где отсутствует терминальная операция:

Stream.of("dd2", "aa2", "bb1", "bb3", "cc4")
    .filter(s -> {
        System.out.println("Фильтр: " + s);
        return true;
    });

При выполнении этого фрагмента кода ничего не печатается на консоль. Это происходит потому, что промежуточные операции будут выполнены только тогда, когда присутствуют терминальные операции.

Давайте расширим предыдущий пример использованием forEach:

Stream.of("dd2", "aa2", "bb1", "bb3", "cc4")
    .filter(s -> {
        System.out.println("Фильтр: " + s);
        return true;
    })
    .forEach(s -> System.out.println("Печать с использованием forEach: " + s));

Выполнив этот код, на консоль выведется следующее:

Фильтр:  dd2
Печать с использованием forEach: dd2
Фильтр: aa2
Печать с использованием forEach: aa2
Фильтр: bb1
Печать с использованием forEach: bb1
Фильтр: bb3
Печать с использованием forEach: bb3
Фильтр: cc4
Печать с использованием forEach: cc4

Порядок выполнения может вас удивить. На первый взгляд все операции будут выполняться по горизонтали одна за другой по всем элементам потока. Но в нашем примере первая строка «dd2» полностью проходит фильтр forEach, потом обрабатывается вторая строка «aa2» и так далее.

Такое поведение может привести к снижению фактического количества операций, выполняемых на каждом элементе. Это мы видим на следующем примере:

Stream.of("dd2", "aa2", "bb1", "bb3", "cc4")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });
// Результат выполнения представлен ниже
 
// map:      dd2
// anyMatch: DD2
// map:      aa2
// anyMatch: AA2

Операция anyMatch возвращает true, как только предикат применится к входному элементу. Это подходит для второго элемента «аа2». В связи с вертикальным исполнением цепи потока, map будет выполнен два раза.

Почему порядок выполнения в stream имеет значение

Следующий пример состоит из двух промежуточных операций map и filter, а также выполнение терминала forEach. Давайте еще раз посмотрим порядок выполнения этих операций:

Stream.of("dd2", "aa2", "bb1", "bb3", "cc4")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));
// Результат выполнения
// map:     dd2
// filter:  DD2
// map:     aa2
// filter:  AA2
// forEach: AA2
// map:     bb1
// filter:  BB1
// map:     bb3
// filter:  BB3
// map:     cc
// filter:  CC

Как вы уже догадались, map и filter называются пять раз для каждой строки в базовой коллекции, тогда как forEach вызывается только один раз.

Мы можем сильно уменьшить фактическое количество выполнений, если мы изменим порядок операций, передвинув filter в начало цепочки:

Stream.of("dd2", "aa2", "bb1", "bb3", "cc4")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));
 
// filter:  dd2
// filter:  aa2
// map:     aa2
// forEach: AA2
// filter:  bb1
// filter:  bb3
// filter:  cc

Теперь, map вызывается только один раз и будет выполняться быстрее для большого количества входных элементов. Имейте это в виду при составлении комплексного метода цепи.

Давайте расширим предыдущий пример с дополнительной операцией, sorted:

Stream.of("dd2", "aa2", "bb1", "bb3", "cc4")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

Сортировка является особым видом промежуточных операций. Это так называемые операции состояния. 

Выполнение этого примера приводит следующий вывод консоли:

sort:    aa2; dd2
sort:    bb1; aa2
sort:    bb1; dd2
sort:    bb1; aa2
sort:    bb3; bb1
sort:    bb3; dd2
sort:    cc4; bb3
sort:    cc4; dd2
filter:  aa2
map:     aa2
forEach: AA2
filter:  bb1
filter:  bb3
filter:  cc4
filter:  dd2

Во-первых, операция сортировки выполняется на всей совокупности входных данных. Другими словами, sorted выполнен в горизонтальном направлении. Таким образом, sorted вызывается 8 раз для нескольких комбинаций на каждом элементе во входной коллекции.

Теперь мы можем оптимизировать производительность:

Stream.of("dd2", "aa2", "bb1", "bb3", "cc4")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));
 
// filter:  dd2
// filter:  aa2
// filter:  bb1
// filter:  bb3
// filter:  cc4
// map:     aa2
// forEach: AA2

В этом примере sorted никогда не вызывали, потому что filter уменьшает входную коллекцию до одного элемента. В этом случае производительность значительно увеличивается для больших входных коллекций.

Повторное использование Потоков (Streams)

Потоки в Java 8 не могут быть использованы повторно. Как только вы называете какую-нибудь терминальную операция, поток закрывается

Stream<String> stream =
    Stream.of("dd2", "aa2", "bb1", "bb3", "cc")
        .filter(s -> s.startsWith("a"));
 
stream.anyMatch(s -> true);    // операция выполнится успешно
stream.noneMatch(s -> true);   // Вылетит Exception

Вызов noneMatch после anyMatch в одном и том же stream вызовет следующее исключение:

Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
	at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
...

Чтобы избежать этого, мы должны создать новую цепь для каждой терминальной операции.

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("dd2", "aa2", "bb1", "bb3", "cc")
            .filter(s -> s.startsWith("a"));
 
streamSupplier.get().anyMatch(s -> true);   // операция пройдет успешно
streamSupplier.get().noneMatch(s -> true);  // здесь также все будет ok

Каждый вызов конструктора get() создает новый поток, с которым мы можем безопасно работать.

Продвинутые операции

Потоки поддерживает множество различных операций. Мы уже узнали о наиболее важных операциях, таких как filter или map. Чтобы ознакомиться с другими доступными операции, загляните в Stream Javadoc.
А теперь давайте глубже познакомимся в более сложными операциями collect, flatMap и reduce.
Для работы с этими операциями напишем следующий код:

class Person {
    String name;
    int age;
 
    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }
 
    @Override
    public String toString() {
        return name;
    }
}
 
List<Person> persons =
    Arrays.asList(
        new Person("Andrew", 20),
        new Person("Igor", 23),
        new Person("Ira", 23),
        new Person("Vitia", 12));

Наверняка многие из вас обратили внимание на очень удобный трюк создания и инициализации List в одну строку.

Операция Collect

Collect является чрезвычайно полезной операцией, чтобы превратить элементы потока в List, Set или Map. Collect принимает Collector, который состоит из четырех различных операций: поставщик, аккумулятор, объединитель и финишер. Это звучит очень сложно, но это только на первый взгляд. Фишкой Java 8 является поддержка различных встроенных коллекторов через класс Collectors. Именно поэтому работа с ними будет намного проще.
Давайте начнем с наиболее распространенного случая:

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("I"))
        .collect(Collectors.toList());
 
System.out.println(filtered);    // [Igor, Ira]

Как видите, у нас получилось всего 6 строчек кода.

Следующий пример группирует всех по возрасту:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));
 
personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
 
// age 20: [Andrew]
// age 23: [Igor, Ira]
// age 12: [Vitia]

Collectors — чрезвычайно универсальные. Вы также можете создавать группы элементов потока, например, определение среднего возраста всех лиц:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));
 
System.out.println(averageAge);     // 19.5

Если вы заинтересованы в более полной статистике, то collectors возвращает специальный встроенный объект со сводной статистикой. Таким образом, мы можем просто определить минимальный, максимальный и средний арифметический возраст, а также найти сумму и количество.

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));
 
System.out.println(ageSummary);
// Результат выполнения: 
// IntSummaryStatistics{count=4, sum=78, min=12, average=19.500000, max=23}

Следующий пример соединяет всех людей в одну строку:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" и ", "В Германии ", " совершеннолетние."));
 
System.out.println(phrase);
// В Германии Andrew и Igor и Ira совершеннолетние.

Коллектор join принимает разделитель, а также дополнительный префикс и суффикс.

Для того чтобы трансформировать элементы потока в map, мы должны указать, как ключи и значения должны быть нанесены на map. Имейте в виду, что замапенные ключи должны быть уникальными, иначе IllegalStateException не избежать. Вы можете передать функцию слияния в качестве дополнительного параметра, чтобы обойти исключение:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));
 
System.out.println(map);
// {20=Andrew, 23=Igor;Ira, 12=Vitya}

Теперь, когда мы знаем, некоторые мощные встроенные коллекторы, давайте попробуем построить свой собственный специальный коллектор. Мы хотим превратить всех людей в потоке в одну строку, состоящую из всех имен в верхнем регистре, разделенных знаком "|".

Для достижения этого мы создаем новый коллектор через Collector.of(). Мы должны пройти четыре этапа использования collectors: supplieraccumulatorcombiner и finisher.

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher
 
String names = persons
    .stream()
    .collect(personNameCollector);
 
System.out.println(names);  // ANDREW | IGOR | IRA | VITIA

Так как строки в Java неизменные, нам нужен вспомогательный класс StringJoiner, чтобы коллектор мог построить нашу строку. Supplier изначально создает такой StringJoiner с соответствующим разделителем. Accumulator используется для добавления имени каждого человека в верхний регистр. Combiner знает как объединить два StringJoiner в один. На последнем этапе Finisher строит желаемую строку из StringJoiner.

FlatMap

Мы уже научились преобразовывать объекты потока в другой тип объектов, используя операции с map. Map ограничена, потому что каждый объект может быть отображен только одним объектом. Но что, если мы хотим преобразовать один объект в нескольких других? На помощь здесь приходит flatMap.

FlatMap преобразует каждый элемент потока в поток других объектов. Таким образом, каждый объект будет преобразован в ноль, один или несколько других объектов, поддерживаемых потоком. Содержание этих потоков будет затем помещают в возвращаемом потоке flatMap операции.

Прежде, чем мы увидим flatMap в действии мы должны соответствующий тип иерархии:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();
 
    Foo(String name) {
        this.name = name;
    }
}
 
class Bar {
    String name;
 
    Bar(String name) {
        this.name = name;
    }
}

Далее, мы используем наши знания о потоках для того, чтобы создать несколько объектов:

List<Foo> foos = new ArrayList<>();
 
// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));
 
// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

Теперь у нас есть список из трех функций, каждая из которых состоит из трех баров. FlatMap принимает функцию, которая должна возвращать поток объектов:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));
 
// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

Как видите, мы успешно преобразовали поток трех объектов Foo в поток девяти объектов.

Наконец, код выше можно упростить:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " + f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

FlatMap также доступна для Optional класса, введенного в Java 8. Optional операции класса flatMap возвращает дополнительный объект другого типа..

Посмотрите на иерархическую структуру типа этой:

class Outer {
    Nested nested;
}
 
class Nested {
    Inner inner;
}
 
class Inner {
    String foo;
}

Далее нужно добавить многочисленные проверки на нулевые значения.

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

Такое же поведение можно получить, используя операцию optionalflatMap :

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

Каждый вызов flatMap возвращает Optional обертку объекта.

Reduce

Операция Reduce сочетает в себе все элементы потока в единый результат. Java 8 поддерживает три различных вида reduce метода.  Давайте посмотрим, как мы можем использовать один метод для определения самого старшего человека:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Ira

Первый Reduce метод

Reduce метод принимает функцию аккумулятора BinaryOperator. Это на самом деле BiFunction, когда оба операнда имеют один и тот же тип, в этом случае Person. BiFunctions похожи на Function, но принимает два аргумента. Пример функции сравнивает людей по возрасту и возвращает самого старшего.

Второй Reduce метод

Второй Reduce метод принимает идентифицирующее значение и BinaryOperator. Этот метод может быть использован для «создания» нового человека с агрегированным имен и возрастом других человек в потоке:

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });
 
System.out.format("name=%s; age=%s", result.name, result.age);
// name=AndrewIgorIraVitia; age=78

Третий Reduce

Третий Reduce  метод принимает три параметра: значение идентификатора,BiFunction аккумулятор и объединитель функции типа BinaryOperator. Поскольку идентифицирующее значение не ограничивает тип Person, мы можем использовать это сокращение для определения суммы возрасте от всех лиц:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
 
System.out.println(ageSum);  // 78

Как видим, результат получился 78, но что же произошло под катом? Давайте посмотрим вывод с подробным описанием:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });
 
//accumulator: sum=0; person=Andrew
//accumulator: sum=20; person=Igor
//accumulator: sum=43; person=Ira
//accumulator: sum=66; person=Vitia

Как видим, аккумулирующая функция делает всю работу. Сначала вызывается инициализирующая значение 0 и первый человек Андрей. В следующих трех вызовах «sum» увеличивается возраст до суммарного 78.

Подождите, что? Комбайнер никогда не вызывается? Выполнение этого стрима параллельно раскроет секрет:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });
 
//accumulator: sum=0; person=Ira
//accumulator: sum=0; person=Vitia
//accumulator: sum=0; person=Andrew
//accumulator: sum=0; person=Igor
//combiner: sum1=23; sum2=12
//combiner: sum1=20; sum2=23
//combiner: sum1=43; sum2=35

Выполнение этого потока параллельно приведет к совершенно иным результатам. Теперь комбайнер действительно вызывается. С тех пор как аккумулятор вызывается параллельно, комбайнеру необходимо суммировать отдельные значения.

Давайте глубже погрузимся в мир параллельных потоков в следующей главе.

Параллельные потоки

Потоки могут быть выполнены параллельно, чтобы увеличить производительность выполнения на большом количестве входных элементов. Параллельные потоки используют общий ForkJoinPool доступный через статический ForkJoinPool.commonPool() метод. Размер основного пула потоков использует до пяти нитей — в зависимости от количества доступных физических ядер процессора:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

На моей машине общий пул инициализируется с параллелизмом 3 по-умолчанию. Это значение может быть уменьшено или увеличено путем установки следующих параметров JVM:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Коллекции поддерживает метод parallelStream(), чтобы создать параллельный поток элементов. Кроме того, вы можете вызвать промежуточный метод parallel() на данном потоке, чтобы преобразовать последовательный поток в параллельной копии.

Для того, чтобы занизить поведение параллельного выполнения параллельного потока Следующий пример печатает информацию о текущем потоке в Sout :

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

После дебага мы получим лучшее понимание, какие потоки на самом деле используется для выполнения операций потока:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

Как вы можете видеть, параллельный поток использует все доступные темы из общей ForkJoinPool для выполнения операций потока. Вывод может отличаться при последовательном запуске, потому что поведение, которое конкретный поток использует не является детерминированным.

Давайте расширим например с помощью дополнительной операции потока — sort:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

Результат может быть странным на первый взгляд:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

Кажется, что sort выполняется последовательно только основной нитью. Но это не так. На самом деле, sort на параллельном потоке использует новый Java 8 метод Arrays.parallelSort() под капотом. Имейте в виду, что отладочный вывод относится только к исполнению переданного лямбда-выражения. Так, sort компаратор выполнен только на главном потоке, но фактическое алгоритм сортировки выполняется параллельно.

Возвращаясь к reduce, например, из последней секции. Мы уже выяснили, что функция комбайнер вызывается только параллельно, но не в последовательных потоках. Давайте посмотрим, какие потоки фактически участвуют:

List<Person> persons =
                Arrays.asList(
                        new Person("Andrew", 20),
                        new Person("Igor", 23),
                        new Person("Ira", 23),
                        new Person("Vitia", 12));
 
        persons
                .parallelStream()
                .reduce(0,
                        (sum, p) -> {
                            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                                    sum, p, Thread.currentThread().getName());
                            return sum += p.age;
                        },
                        (sum1, sum2) -> {
                            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                                    sum1, sum2, Thread.currentThread().getName());
                            return sum1 + sum2;
                        });

Вывод на консоль показывает, что оба аккумулятора и комбайнера функции выполняются параллельно на всех доступных потоках:

accumulator: sum=0; person=Ira [main]
accumulator: sum=0; person=Andrew [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Vitia [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=Igor [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12 [ForkJoinPool.commonPool-worker-3]
combiner: sum1=20; sum2=23 [ForkJoinPool.commonPool-worker-1]
combiner: sum1=43; sum2=35 [ForkJoinPool.commonPool-worker-1]

Таким образом, можно констатировать, что параллельные потоки могут дать хороший прирост производительности в потоках с большим количеством входных элементов. Но имейте в виду, что некоторые параллельные операции потока reduce и collect требуют дополнительные расчеты (комбинированные операции), которые не нужны при последовательном выполнении.

Ссылка на основную публикацию