Parallelism

How to Convert a Regular Stream into a Parallel Stream

You can make a regular ⚪ Stream<T> a parallel ⚪ Stream<T> in two ways:

These intermediate operations are going to set a special bit in the ⚪ Stream<T> that will trigger the computations in parallel when you call your terminal operation.

After terminal operation is called, the ⚪ Stream<T> implementation will check this special bit:

  • if it is set to 1, then the computation will happen in parallel and will trigger special algorithms for that

  • otherwise it will be processed sequentially

JMH (Java Microbenchmark Harness)

Standard tool to measure the performances of your Java code. You can find JMH Template project here.

To proper measure the performance of a Java application you first need to run it a certain number of times (warmup), because within JVM there is a special just-in-time C2 compiler that can optimize your code a lot.

Getting the best performance gains from parallel ⚪ Stream<T>

Beware, you’ll have two unboxing and one boxing here (which have an impact on performance!):

Integer j = 3;
Integer l = 4;
Integer k = j + l;

Similarly, iterating and doing some operation over int[] will be much faster than over 🔴 Integer[] (due to unboxing and boxing).

Each physical core of the CPU access two levels of cache, which are private:

  • L1 - the first level of cache (small, very fast)

  • L2 - the second level of cache (bigger, little slower).

L3 - the third level of cache is shared by all physical cores (bigger, little slower than L2).

Main memory lives outside the CPU and is accessed much slower than L3:

cpu caches
How does a core of CPU access data from the main memory?

This data will have to be transferred first to the L3 cache, then to the L2 cache, and then to the L1 cache before being available by this core of CPU.

The memory is not read in a random way. If a core of CPU wants to access just an int, it will transfer line of memory, not just an int. The memory is transferred line by line from the main memory to the different levels of cache. Each line is typically 64 bytes (8 long or 16 int).

When we have an array of references to the instance of the 🔴 Integer which are going to hold the values:

  • In one read operation we are only going to transfer the references to this object.

  • And then to get all the values it may be several read to maximum 16 reads.

This is called Pointer Chasing which you need to avoid. To get all the values, we have to follow pointers to other places in the memory which has a cost due to the way that data is transferred from the main memory to the cache of the CPU.

Iterating on a 🟢 LinkedList<E> will imply much more Pointer Chasing than iterating on an 🟢 ArrayList<E>. 🟢 LinkedList<E> does not store references to the 🔴 Integer in an array. It stores them in a linked list of node objects. So first, you need to get the first reference to the first node object, and then you need to follow two pointers: the first one to the next node object and the second one to the 🔴 Integer itself.

Cache Miss

Whenever the core of a CPU needs a value and that value turns out not to be in the cache. During the Cache Miss, if the value is not in the L2 cache or L3 cache, then the CPU may decide to suspend the thread that is executing this computation and replace it by another thread. This is a much bigger performance hit than just Pointer Chasing, but it is still a consequence of Pointer Chasing.

This is why 🟢 LinkedList<E> is called cache-unfriendly structure, whereas 🟢 ArrayList<E> is called cache-friendly structure.

Fork/Join Framework implementation of parallel ⚪ Stream<T>

Fork/Join Framework has been introduced in Java 7 and slightly modified in Java 8.

Going parallel means that this task is going to be split into subtasks and each subtask is going to produce a partial result (fork step). When those partial results have been computed, they are going to be sent back to the main task that has the responsibility of joining them (join step).

All those tasks are sent to a special pool of thread called Fork/Join Pool and this pool has special capabilities to enable the computations of those subtasks in parallel.

Fork/Join Framework decides whether the task is too big and should be split, or small enough and should be computed.

Fork/Join Pool

Is a pool of threads, instance of the 🟢 ForkJoinPool

  • created when the JVM is created

  • there is only one common 🟢 ForkJoinPool in the JVM since Java 8

  • it is used for all the parallel streams

  • the size is fixed by the number of virtual cores (not necessarily a good thing) and can be changed with java.util.concurrent.ForkJoinPool.common.parallelism system property

Suppose we have a common Fork/Join Pool with 2 threads in it. Each thread is also associated with a waiting queue that can accept tasks. Suppose we have a task to compute:

  1. The first step is to split this task (A) into subtasks (A11, A12).

  2. Since parent task (A) is waiting for the partial results computed by its subtasks, it will be put at the end of the waiting queue thus freeing the first thread 1️⃣ that is going to be able to handle the task A11.

  3. Since second thread 2️⃣ is not working, it is able to steal some tasks from another waiting queue. So second thread 2️⃣ will steal a task A12 from first thread 1️⃣, that is busy with A11 task. All threads will be busy.

    Fork/Join Framework implements a trick that is quite classical in concurrent programming that is called work stealing.
  4. Once all subtasks will finish computation, the results will be passed to A task which will apply the reduce operator.

You need to bench your own computation to be able to tell where your sweet spot is going to be in your use case. E.g. computing the sum of 10 integers in array is much faster sequentially than in parallel, however computing the sum of several millions integers in array will be much faster in parallel.

You also need to check that on a machine that is as close as possible to your production machine.
Synchronization

A feature within the Java language which prevents two threads from executing the same piece of code.

In the Stream API there are operations (intermediate and terminal) that need to exchange data or exchange information with the other threads and perform hidden synchronization which is a bottleneck, e.g. ⚪ Stream<T>#findFirst() (⚪ Stream<T>#findAny() will provide you much better performance in parallel) and ⚪ Stream<T>#limit(long maxSize) methods.

If your ⚪ BinaryOperator<T> from ⚪ Stream<T>#reduce(BinaryOperator<T> accumulator) method is not associative, then you are going to compute wrong results in both: sequential and parallel computing. In parallel, you can also get different results each time.

Associative

Means that first computation gives out the same result as the second computation. Example:

stream.reduce(0, (i1, i2) -> i1*i1 + i2*i2); (1) (2) (3)
1 Given: [1, 1, 1, 1]
2 It should return 4
3 Since it is not associative ⚪ BinaryOperator<T>, it will compute 2 (1+1), then 5 (4+1), and it will return 26 (25+1)

You can display the thread executing your parallel stream with:

Set<String> threadNames = ConcurrentHashMap.newKeySet();
stream.parallel()
    .(...)
    .peek(i -> threadNames.add(Thread.currentThread().getName()))
    .(...);
threadNames.forEach(System.out::println);

You can execute a parallel stream in a custom 🟢 ForkJoinPool, display the threads executing your parallel stream and count the number of tasks each thread executed with:

Map<String, Long> threadMap = new ConcurrentHashMap<>();
Callable<Integer> task = () -> {
    int result = stream.parallel()
        .(...)
        .peek(i -> threadMap.merge(Thread.currentThread().getName(), 1L, Long::sum))
        .(...);
    return result;
};

ForkJoinPool forkJoinPool = new ForkJoinPool(4); (1)
ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
submit.get();
threadMap.forEach((name, n) -> System.out.println(name + " -> " + n));
forkJoinPool.shutdown();
1 Custom 🟢 ForkJoinPool with 4 threads

Choosing the right sources of data to efficiently go parallel

The Fork step in Fork/Join Framework works best based on two assumptions:

  • The number of elements is known before processing them. These sources of data do not meet this condition:

  • Reaching the center of the data must be easy, reliable and efficient. This source of data does not meet this condition:

    • 🟢 LinkedList<E>

Fork/Join Framework splits the array in two pieces and doesn’t know if there is the same amount of data in the first half and in the second half. This information is not available, unless you count all the elements, one by one, in each half. The Framework doesn’t do that, because it would be too costly.

Those two halves are going to be split again, and again, and again. Some of the segments of this array are going to be empty. Become higher and higher as the number of splitting increases. This is a problem with ⚪ Set<E>-like structures.

Sizeable

The number of elements of the source is known. All the collections and the arrays are sizeable, but all the patterns, lines and iterators are not sizeable.

Subsizeable

The number of elements in both parts of a split source is known. Sets are sizeable, but they are not subsizeable.

Processing data from a ⚪ List<E> is much faster than the processing data from a ⚪ Set<E> because iterating over the elements of a ⚪ List<E> is faster than iterating over the elements of a ⚪ Set<E>.

Going parallel as a ⚪ List<E> will bring you more performance gain than going parallel in a ⚪ Set<E>.

You may have data to process that, in fact, is not well spread over all the buckets of the array that is backing your ⚪ Set<E> (e.g. lines of strings that all return 0 hash code will be handled by only one thread).

Are you sure that your threads should be used to compute your streams in parallel?

In case of a web application your threads are used to serve your HTTP clients, so you don’t want those threads to be used for anything else, including parallel streams. The same goes for threads that are used for SQL transactions. Threads are precious resources.