Home Tutorials Training Consulting Books Company Contact us


Get more...

Java concurrency (multi-threading). This article describes how to do concurrent programming with Java. It covers the concepts of parallel programming, immutability, threads, the executor framework (thread pools), futures, callables CompletableFuture and the fork-join framework.

1. Concurrency

1.1. What is Concurrency?

Concurrency refers to the ability to execute multiple programs or parts of a program simultaneously. By performing time-consuming tasks asynchronously or in parallel, concurrency enhances both the throughput and interactivity of applications.

Modern computers typically feature multiple CPUs or cores within a single CPU. Effectively leveraging these multi-core architectures is crucial for building successful high-volume applications.

1.2. Process vs. Threads

A process operates independently and in isolation from other processes. It cannot directly access shared data in different processes. The operating system allocates resources, such as memory and CPU time, to each process.

In contrast, a thread is a lightweight process that shares the same memory space as other threads within the same process. Each thread has its own call stack and memory cache. When a thread accesses shared data, it temporarily stores this information in its cache, allowing for faster access.

In Java, applications typically run within a single process, but they can utilize multiple threads to achieve parallel processing and asynchronous behavior. == Improvements and Issues with Concurrency

1.3. Limits of Concurrency Gains

In a Java application, multiple threads are utilized to achieve parallel processing and asynchronous behavior. Concurrency enables faster execution of certain tasks by dividing them into subtasks that can run simultaneously. However, the overall performance gain is constrained by the portion of the task that can be executed in parallel.

The theoretical possible performance gain can be calculated by the following rule which is referred to as Amdahl’s Law.

If F is the percentage of the program which can not run in parallel and N is the number of processes, then the maximum performance gain is 1 / (F+ ((1-F)/N)).

1.4. Concurrency Issues

While threads have their own call stacks, they can also access shared data, leading to two primary challenges: visibility and access problems.

  • Visibility Problem: This occurs when Thread A reads shared data that Thread B subsequently modifies, and Thread A remains unaware of this change. As a result, Thread A may operate on stale data, leading to inconsistencies.

  • Access Problem: This arises when multiple threads attempt to read and modify the same shared data simultaneously. This can result in data corruption or unpredictable behavior.

Visibility and access problems can lead to critical failures, including:

  • Liveness Failure: The program may become unresponsive due to issues in concurrent data access, such as deadlocks or starvation.

  • Safety Failure: The program may produce incorrect or inconsistent data due to improper handling of shared resources. == Concurrency in Java

1.5. Processes and Threads

A Java program operates within its own process and typically runs in a single thread by default. Java provides built-in support for multi-threading through the Thread class, enabling the creation of new threads for concurrent execution.

Since Java 1.5, the java.util.concurrent package has enhanced support for concurrency, introducing more sophisticated mechanisms for thread management, synchronization, and communication.

1.6. Locks and Thread Synchronization

Java employs locks to ensure that specific sections of code are executed by only one thread at a time, preventing concurrent access issues. The simplest way to implement locking in Java is by using the synchronized keyword, which can be applied to methods or blocks of code.

The synchronized keyword guarantees the following:

  • Only one thread can execute a synchronized method or block at any given time.

  • Any thread entering a synchronized block will see the effects of all prior modifications that were protected by the same lock.

Synchronization is crucial for mutually exclusive access to shared resources and reliable communication between threads.

You can apply the synchronized keyword at the method level to restrict access, ensuring that only one thread can enter the method at a time. Any other thread attempting to call this method will be forced to wait until the first thread exits.

public synchronized void critical() {
    // Thread-critical operations go here
}

Alternatively, you can use the synchronized keyword to protect specific blocks of code within a method. These blocks are guarded by a lock, which can be represented by an object or a string.

All code protected by the same lock can only be executed by one thread at a time. For example, consider the following CrawledSites data structure, which ensures that only one thread can access the inner blocks of the add() and next() methods.

package de.vogella.pagerank.crawler;

import java.util.ArrayList;
import java.util.List;

/**
 * Data structure for a web crawler. Tracks visited sites and maintains a
 * list of sites yet to be crawled.
 *
 * @author Lars Vogel
 */
public class CrawledSites {
    private List<String> crawledSites = new ArrayList<>();
    private List<String> linkedSites = new ArrayList<>();

    public void add(String site) {
        synchronized (this) {
            if (!crawledSites.contains(site)) {
                linkedSites.add(site);
            }
        }
    }

    /**
     * Retrieves the next site to crawl. Returns null if there are no sites left.
     */
    public String next() {
        if (linkedSites.isEmpty()) {
            return null;
        }
        synchronized (this) {
            // Recheck size to avoid race conditions
            if (!linkedSites.isEmpty()) {
                String s = linkedSites.remove(0);
                crawledSites.add(s);
                return s;
            }
            return null;
        }
    }
}

1.7. Volatile

A variable declared with the volatile keyword guarantees that any thread reading the field will see the most recently written value. It is important to note that the volatile keyword does not provide mutual exclusion for the variable.

Since Java 5, writing to a volatile variable ensures that any non-volatile variables modified by the same thread are updated as well. This behavior can be leveraged to update values within a reference variable. For instance, if a variable is declared as volatile, you should use a temporary variable to modify it and then assign the temporary variable back to the original variable. This process ensures that both the address changes and the values are visible to other threads.

2. The Java memory model

2.1. Overview

The Java memory model describes the communication between the memory of the threads and the main memory of the application.

It defines the rules how changes in the memory done by threads are propagated to other threads.

The Java memory model also defines the situations in which a thread re-freshes its own memory from the main memory.

It also describes which operations are atomic and the ordering of the operations.

2.2. Atomic operation

An atomic operation is an operation which is performed as a single unit of work without the possibility of interference from other operations.

The Java language specification guarantees that reading or writing a variable is an atomic operation (unless the variable is of type long or double). Operations variables of type long or double are only atomic if they are declared with the volatile keyword.

Assume i is defined as int. The i++ (increment) operation it not an atomic operation in Java. This also applies for the other numeric types, e.g. long.

The i++ operation first reads the value which is currently stored in i (atomic operations) and then it adds one to it (atomic operation). But between the read and the write the value of i might have changed.

Since Java 1.5 the java language provides atomic variables, e.g. AtomicInteger or AtomicLong which provide methods like getAndDecrement(), getAndIncrement() and getAndSet() which are atomic.

2.3. Memory updates in synchronized code

The Java memory model guarantees that each thread entering a synchronized block of code sees the effects of all previous modifications that were guarded by the same lock.

3. Immutability and defensive Copies

3.1. Immutability

The simplest way to avoid problems with concurrency is to share only immutable data between threads. Immutable data is data which cannot be changed.

To make a class immutable define the class and all its fields as final.

Also ensure that no reference to fields escape during construction. Therefore any field must:

  • be private

  • have no setter method

  • be copied in the constructor if it is a mutable object to avoid changes of this data from outside

  • never be directly returned or otherwise exposed to a caller

  • not change or if a change happens this change must not be visible outside

An immutable class may have some mutable data which is used to manage its state but from the outside neither this class nor any attribute of this class can get changed.

For all mutable fields, e.g. Arrays, that are passed from the outside to the class during the construction phase, the class needs to make a defensive-copy of the elements to make sure that no other object from the outside can change the data

3.2. Defensive Copies

You must protect your classes from calling code. Assume that calling code will do its best to change your data in a way you didn’t expect it. While this is especially true in the case of immutable data, it is also true for non-immutable data which you don’t expect to be changed from outside your class.

To protect your class against that, you should copy data which you receive and only return copies of data to calling code.

The following example creates a copy of a list (ArrayList) and returns only the copy of the list. This way the client of this class cannot remove elements from the list.

package de.vogella.performance.defensivecopy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class MyDataStructure {
    List<String> list = new ArrayList<String>();

    public void add(String s) {
        list.add(s);
    }

    /**
     * Makes a defensive copy of the List and return it
     * This way cannot modify the list itself 
     * 
     * @return List<String>
     */
    public List<String> getList() {
        return Collections.unmodifiableList(list);
    }
}

4. Threads in Java

The base means for concurrency is the java.lang.Threads class. A Thread executes an object of type java.lang.Runnable.

Runnable is an interface with defines the run() method. This method is called by the Thread object and contains the work which should be done. Therefore the Runnable is the task to perform. The Thread is the worker who is doing this task.

The following demonstrates a task (Runnable) which counts the sum of a given range of numbers. Create a new Java project called de.vogella.concurrency.threads for the example code of this section.

package de.vogella.concurrency.threads;

/**
 * MyRunnable will count the sum of the number from 1 to the parameter
 * countUntil and then write the result to the console.
 * <p>
 * MyRunnable is the task which will be performed
 * 
 * @author Lars Vogel
 * 
 */
public class MyRunnable implements Runnable {
    private final long countUntil;

    MyRunnable(long countUntil) {
        this.countUntil = countUntil;
    }

    @Override
    public void run() {
        long sum = 0;
        for (long i = 1; i < countUntil; i++) {
            sum += i;
        }
        System.out.println(sum);
    }
}

The following example demonstrates the usage of the Thread and the Runnable class.

package de.vogella.concurrency.threads;

import java.util.ArrayList;
import java.util.List;

public class Main {

    public static void main(String[] args) {
        // We will store the threads so that we can check if they are done
        List<Thread> threads = new ArrayList<Thread>();
        // We will create 500 threads
        for (int i = 0; i < 500; i++) {
            Runnable task = new MyRunnable(10000000L + i);
            Thread worker = new Thread(task);
            // We can set the name of the thread
            worker.setName(String.valueOf(i));
            // Start the thread, never call method run() direct
            worker.start();
            // Remember the thread for later usage
            threads.add(worker);
        }
        int running = 0;
        do {
            running = 0;
            for (Thread thread : threads) {
                if (thread.isAlive()) {
                    running++;
                }
            }
            System.out.println("We have " + running + " running threads. ");
        } while (running > 0);

    }
}

Using the Thread class directly has the following disadvantages:

  • Creating a new thread causes some performance overhead.

  • Too many threads can lead to reduced performance, as the CPU needs to switch between these threads.

  • You cannot easily control the number of threads, therefore you may run into out of memory errors due to too many threads.

    The `java.util.concurrent` package offers improved support for concurrency compared to the direct usage of `Threads`.
    This package is described in the next section.

5. Threads pools with the Executor Framework

You find this examples in the source section in Java project called de.vogella.concurrency.threadpools.

Thread pools manage a pool of worker threads. The thread pools contain a work queue which holds tasks waiting to get executed.

A thread pool can be described as a collection of Runnable objects (work queue) and a connection of running threads.

These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add a new Runnable object to the work queue.

The Executor framework provides example implementation of the java.util.concurrent.Executor interface, e.g. Executors.newFixedThreadPool(int n) which will create n worker threads. The ExecutorService adds life cycle methods to the Executor, which allows to shutdown the Executor and to wait for termination.

If you want to use one thread pool with one thread which executes several runnables you can use the Executors.newSingleThreadExecutor() method.

Create again the Runnable.

package de.vogella.concurrency.threadpools;

/**
 * MyRunnable will count the sum of the number from 1 to the parameter
 * countUntil and then write the result to the console.
 * <p>
 * MyRunnable is the task which will be performed
 * 
 * @author Lars Vogel
 * 
 */
public class MyRunnable implements Runnable {
    private final long countUntil;

    MyRunnable(long countUntil) {
        this.countUntil = countUntil;
    }

    @Override
    public void run() {
        long sum = 0;
        for (long i = 1; i < countUntil; i++) {
            sum += i;
        }
        System.out.println(sum);
    }
}

Now you run your runnables with the executor framework.

package de.vogella.concurrency.threadpools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    private static final int NTHREDS = 10;

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
        for (int i = 0; i < 500; i++) {
            Runnable worker = new MyRunnable(10000000L + i);
            executor.execute(worker);
        }
        // This will make the executor accept no new threads
        // and finish all existing threads in the queue
        executor.shutdown();
        // Wait until all threads are finish
        executor.awaitTermination();
        System.out.println("Finished all threads");
    }
}

In case the threads should return some value (result-bearing threads) then you can use the java.util.concurrent.Callable class.

6. CompletableFuture

Any time consuming task should be preferable done asynchronously. Two basic approaches to asynchronous task handling are available to a Java application:

  • application logic blocks until a task completes

  • application logic is called once the task completes, this is called a nonblocking approach.

CompletableFuture which extends the Future interface supports asynchronous calls. It implements the CompletionStage interface. CompletionStage offers methods, that let you attach callbacks that will be executed on completion.

It adds standard techniques for executing application code when a task completes, including various ways to combine tasks. CompletableFuture support both blocking and nonblocking approaches, including regular callbacks.

This callback can be executed in another thread as the thread in which the CompletableFuture is executed.

The following example demonstrates how to create a basic CompletableFuture.

CompletableFuture.supplyAsync(this::doSomething);

CompletableFuture.supplyAsync runs the task asynchronously on the default thread pool of Java. It has the option to supply your custom executor to define the ThreadPool.

package snippet;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureSimpleSnippet {
    public static void main(String[] args) {
        long started = System.currentTimeMillis();
        
        // configure CompletableFuture
        CompletableFuture<Integer> futureCount = createCompletableFuture();
                
            // continue to do other work
            System.out.println("Took " + (started - System.currentTimeMillis()) + " milliseconds" );
            
            // now its time to get the result
            try {
              int count = futureCount.get();
                System.out.println("CompletableFuture took " + (started - System.currentTimeMillis()) + " milliseconds" );

               System.out.println("Result " + count);
             } catch (InterruptedException | ExecutionException ex) {
                // Exceptions from the future should be handled here
            }   
    }

    private static CompletableFuture<Integer> createCompletableFuture() {
        CompletableFuture<Integer> futureCount = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        // simulate long running task
                        Thread.sleep(5000);
                    } catch (InterruptedException e) { }
                    return 20;
                });
        return futureCount;
    }
    
}

The thenApply can be used to define a callback which is executed once the CompletableFuture.supplyAsync finishes. The usage of the thenApply method is demonstrated by the following code snippet.

package snippet;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCallback {
    public static void main(String[] args) {
        long started = System.currentTimeMillis();

        CompletableFuture<String>  data = createCompletableFuture()
                .thenApply((Integer count) -> {
                    int transformedValue = count * 10;
                    return transformedValue;
                }).thenApply(transformed -> "Finally creates a string: " + transformed);

            try {
                System.out.println(data.get());
            } catch (InterruptedException | ExecutionException e) {
                
            }
    }

    public static CompletableFuture<Integer> createCompletableFuture() {
        CompletableFuture<Integer>  result = CompletableFuture.supplyAsync(() -> {
            try {
                // simulate long running task
                Thread.sleep(5000);
            } catch (InterruptedException e) { }
            return 20;
        });
        return result;
    }

}

You can also start a CompletableFuture delayed as of Java 9.

CompletableFuture<Integer> future = new CompletableFuture<>();
 future.completeAsync(() -> {
       System.out.println("inside future: processing data...");
       return 1;
 }, CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS))
 .thenAccept(result -> System.out.println("accept: " + result));

7. Nonblocking algorithms

Java 5.0 provides support for additional atomic operations. This allows to develop algorithms which are non-blocking algorithms, e.g. which do not require synchronization, but are based on low-level atomic hardware primitives such as compare-and-swap (CAS). A compare-and-swap operation checks if a variable has a certain value and if it has that value it will perform an operation.

Non-blocking algorithms are typically faster than blocking algorithms, as the synchronization of threads appears on a much finer level (hardware).

For example this creates a non-blocking counter which always increases. This example is contained in the project called de.vogella.concurrency.nonblocking.counter.

package de.vogella.concurrency.nonblocking.counter;

import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
    private AtomicInteger value = new AtomicInteger(); 
    public int getValue(){
        return value.get();
    }
    public int increment(){
        return value.incrementAndGet();
    }
    
    // Alternative implementation as increment but just make the 
    // implementation explicit
    public int incrementLongVersion(){
        int oldValue = value.get();
        while (!value.compareAndSet(oldValue, oldValue+1)){
             oldValue = value.get();
        }
        return oldValue+1;
    }
    
}

And a test.

package de.vogella.concurrency.nonblocking.counter;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {
        private static final int NTHREDS = 10;

        public static void main(String[] args) {
            final Counter counter = new Counter();
            List<Future<Integer>> list = new ArrayList<Future<Integer>>();

            ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
            for (int i = 0; i < 500; i++) {
                Callable<Integer> worker = new  Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        int number = counter.increment();
                        System.out.println(number );
                        return number ;
                    }
                };
                Future<Integer> submit= executor.submit(worker);
                list.add(submit);

            }
            
            
            // This will make the executor accept no new threads
            // and finish all existing threads in the queue
            executor.shutdown();
            // Wait until all threads are finish
            while (!executor.isTerminated()) {
            }
            Set<Integer> set = new HashSet<Integer>();
            for (Future<Integer> future : list) {
                try {
                    set.add(future.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            if (list.size()!=set.size()){
                throw new RuntimeException("Double-entries!!!"); 
            }

        }


}

The interesting part is how the incrementAndGet() method is implemented. It uses a CAS operation.

public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

The JDK itself makes more and more use of non-blocking algorithms to increase performance for every developer. Developing correct non-blocking algorithms is not a trivial task.

For more information on non-blocking algorithms, e.g. examples for a non-blocking Stack and non-blocking LinkedList, please see http://www.ibm.com/developerworks/java/library/j-jtp04186/index.html .

8. Fork-Join in Java 7

Java 7 introduced a new parallel mechanism for compute intensive tasks, the fork-join framework. The fork-join framework allows you to distribute a certain task on several workers and then wait for the result.

For Java 6.0 you can download the package (jsr166y) from the Download site.

For testing create the Java project "de.vogella.performance.forkjoin". If you are not using Java 7 you also need to add jsr166y.jar to the classpath.

Create first an algorithm package and then the following class.

package algorithm;

import java.util.Random;

/**
 * 
 * This class defines a long list of integers which defines the problem we will
 * later try to solve
 * 
 */
public class Problem {
    private final int[] list = new int[2000000];

    public Problem() {
        Random generator = new Random(19580427);
        for (int i = 0; i < list.length; i++) {
            list[i] = generator.nextInt(500000);
        }
    }

    public int[] getList() {
        return list;
    }

}

Define now the Solver class as shown in the following example coding.

The API defines other top classes, e.g. RecursiveAction, AsyncAction. Check the Javadoc for details.
package algorithm;

import java.util.Arrays;

import jsr166y.forkjoin.RecursiveAction;

public class Solver extends RecursiveAction {
    private int[] list;
    public long result;

    public Solver(int[] array) {
        this.list = array;
    }

    @Override
    protected void compute() {
        if (list.length == 1) {
            result = list[0];
        } else {
            int midpoint = list.length / 2;
            int[] l1 = Arrays.copyOfRange(list, 0, midpoint);
            int[] l2 = Arrays.copyOfRange(list, midpoint, list.length);
            Solver s1 = new Solver(l1);
            Solver s2 = new Solver(l2);
            forkJoin(s1, s2);
            result = s1.result + s2.result;
        }
    }
}

Now define a small test class for testing it efficiently.

package testing;

import jsr166y.forkjoin.ForkJoinExecutor;
import jsr166y.forkjoin.ForkJoinPool;
import algorithm.Problem;
import algorithm.Solver;

public class Test {

    public static void main(String[] args) {
        Problem test = new Problem();
        // check the number of available processors
        int nThreads = Runtime.getRuntime().availableProcessors();
        System.out.println(nThreads);
        Solver mfj = new Solver(test.getList());
        ForkJoinExecutor pool = new ForkJoinPool(nThreads);
        pool.invoke(mfj);
        long result = mfj.getResult();
        System.out.println("Done. Result: " + result);
        long sum = 0;
        // check if the result was ok
        for (int i = 0; i < test.getList().length; i++) {
            sum += test.getList()[i];
        }
        System.out.println("Done. Result: " + sum);
    }
}

9. Deadlock

A concurrent application has the risk of a deadlock. A set of processes are deadlocked if all processes are waiting for an event which another process in the same set has to cause.

For example if thread A waits for a lock on object Z which thread B holds and thread B waits for a lock on object Y which is hold by process A, then these two processes are locked and cannot continue in their processing.

This can be compared to a traffic jam, where cars(threads) require the access to a certain street(resource), which is currently blocked by another car(lock).

Deadlock