Java多线程基础
2018-09-07 | 分类 Multithread | 标签 Java Multithread

前言

这篇文章,是对Java多线程编程的基础性介绍。

本文将介绍Java语言为支持多线程编程提供的一些特性。通过这篇文章,您将了解到如何通过Java语言创建一个线程,如何通过内置的锁来实现线程间的同步,如何在线程间进行通信以及线程的中断机制。

什么是线程

线程是操作系统调度的最小单位,在一个进程中,一般至少有一个线程在运行。一个进程中包含的多个线程,在多核处理器中,操作系统可以将多个线程调度到不同的CPU核心上运行,多个线程可以并行运行。

多线程基础

在同一个进程中的多个线程,共享同一个进程空间,这意味着,线程间通信的成本相对进程间会低很多,但是由于可以同时访问同一个内存地址,所以不正确的同步可能会导致数据竞争而发生错误。

创建一个线程

在Java中,创建一个线程最简单的方式是继承 java.lang.Thread 类。通过重写 Thread 类的run()方法,可以将线程中需要执行的代码放到run()方法中,这样,当这个线程被启动以后,可以在新启动的线程中执行这些逻辑。

public class SimpleThread extends Thread {
    @Override
    public void run() {
        // TODO 线程需要运行的代码
    }
    
    public static void main(String ...args) throws Exception {
        Thread t = new SimpleThread();
        // 启动线程
        t.start();
        
        t.join();
    }
}

上面的代码中,我们通过继承 Thread 创建了一个 SimpleThread 类,然后重写了 Threadrun()方法,把需要在新的线程中执行的逻辑放到run()方法中,当通过 Threadstart()方法启动线程以后,run()方法就会在新线程中被执行。

如果采用这种方式创建线程,那么当我们有多个不同的逻辑需要并行执行,那么我们需要像上面一样,通过继承的方式创建多个 Thread 的子类,然后重写run()方法来实现。这样看来,我们其实是创建了多个不同类型的线程,然后启动它们。

除了采用继承 Thread 类型,重新run()方法来创建线程,java还提供了一个 java.lang.Runnable 接口来”创建”多线程。准确的说 Runnable 接口并没有创建一个线程,而是通过实现 Runnable 接口定义一个可以被线程执行的任务。

public class SimpleTask implements Runnable {
    @Override
    public void run() {
        // TODO 当前任务中的逻辑
    }
}

Thread 类的构造函数接受一个实现了 Runnable 接口的对象,通过将实现了 Runnable 接口的对象传递给一个 Thread 实例,相当于对这个线程提交了一个任务,当Thread启动以后,提交的任务就会被执行。

public Thread(Runnable target) {
    init(null, target, "Thread-" + nextThreadNum(), 0);
}

可以看到,在 Thread 类中,其中的一个构造函数,就是接受实现了 Runnable 接口的对象,将其视为一个任务执行。通过 Runnable 方式创建线程的完整代码如下:

public class SimpleTask implements Runnable {
    @Override
    public void run() {
        // TODO 当前任务中的逻辑
    }
    
    public static void main(String ...args) throws Exception {
        Thread t = new Thread(new SimpleTask());
        // 启动线程
        t.start();
        
        t.join();
    }
}

这两种方式的区别是,实现 Runnable 的方式更加注重定义的是一个任务,它定义了可以被并发执行的代码边界。作为一个单独的任务,任务中的代码和线程 Thread 之间是松耦合的,在run()中的代码并不关心它是如何被执行的。我们可以把它扔给 Thread,作为一个并行的任务执行,也可以直接调用run()方法。而 Thread 方式创建的线程,其实相当于创建一个特定的线程的目的是为了执行一个任务,更多的是 “创建一个线程”,而前者更多表示的是 “创建一个任务”。一般推荐采用实现 Runnable 接口创建任务的方式。还有一个原因,是提倡面向接口的设计。

内置锁

synchronized关键字

在同一个进程中的所有线程,都可以共享该进程的地址空间。为了保证操作的原子性,需要使用 “锁” 来保证临界区的安全访问。Java提供了一种称为内置锁的机制来保证原子性。Java在语言层面提供了 synchronized 关键字,通过 synchronized 关键字来定义一个同步块。

synchronized (lock_obj) {
    // 同步块
}

一个同步块相当于是一个临界区,只允许一个线程可以进入这个同步块,而其他尝试进入的线程将会被阻塞。其中lock_obj可以是任何一个java对象,在Java中,每个Java对象都可以作为同步块的锁,这个锁就被称为内置锁。通过内置锁,我们可以实现一种称为 监视器模式 的同步模式。

public synchronized void entry() {
    // 同步块
}

当两个线程同时调用一个对象上的entry()方法的时候,将会去争用这个对象的内置锁,只有持有了这个对象上的内置锁的时候,才被允许进入这个方法体,而在当前进入的线程从该方法中返回之前,其他尝试进入该方法的线程将会被阻塞,所以整个方法体其实就是一个临界区。Java提供的这种同步的方式,使得同步代码编写更加方便,代码可读性更高。

可重入

Java的内置锁是可重入的,这意味着,如果同一个线程可以多次持有同一个内置锁而不会被阻塞。

public synchronized void entryA() {
    // 同步块
}
    
public synchronized void entryB() {
    // 再次持有内置锁
    this.entryA();
}

上面的方法entryA()entryB()是在同一个方法中,当在entryB()中调用entryA()的时候,由于当前线程已经持有了该对象的内置锁,由于内置锁是可重入的,所以再次调用entryA()的时候,不会被阻塞。

通知

机制

当一个线程由于一些执行条件不满足而等待,而该执行条件的满足需要别的线程触发的时候,就需要一种通知机制来保证:当条件被满足的时候,可以告知等待这个条件的线程可以继续执行。通知机制是线程间同步的一种方式,可以让多个线程协同工作,保持同步。

Java提供了Object.wait()Object.notify()/Object.notifyAll()方法来实现通知机制。Java的通知机制,需要和Java的内置锁配合使用。

首先,我们创建一个线程 threadA ,让它调用wait()进入等待状态:

Thread threadA = new Thread(new Runnable() {
    @Override
    public void run() {
        synchronized (mutex) {
            System.out.println("Run in Thread A");
            System.out.println("wait");
            try {
                mutex.wait();
            } catch (InterruptedException ex) {
                // some code
            }
            System.out.println("resume from wait");
            System.out.println("exit");
        }
   }
});

上面,我们在持有了 mutex 锁,进入临界区以后,调用mutex.wait()方法让当前的线程进入等待状态。下面,我们需要创建一个线程 threadB 来唤醒线程 threadA

Thread threadB = new Thread(new Runnable() {
    @Override
    public void run() {
        synchronized (mutex) {
            System.out.println("Run in Thread B");
            System.out.println("notify");
            mutex.notify();
            System.out.println("exit");
        }
    }
});

在线程 threadB 中,我们通过调用mutex.notify()方法来唤醒在 mutex 上等待的线程。通知机制的时序图如下:

时序图

Object.wait()方法的语义是:当一个 线程A 持有某个对象 mutex 上的内置锁时,该 线程A 可以调用 mutex.wait()方法让自己阻塞,并且释放持有的 mutex 的内置锁。当 线程B 持有对象 mutex 上的内置锁的时候,可以通过调用mutex.notify()来通知所有在 mutex 内置锁上等待的线程。而 线程A 得到通知以后,需要等到再次持有 mutex 的内置锁的时候,才会从wait()调用中唤醒返回。

Object.notify()将设置锁上的条件,并通知在该锁的条件上等待的其中任意一个线程。当 线程B 退出临界区并释放持有的 mutex锁 以后,线程A再次尝试获取 mutex 锁,并继续执行。

Object.notifyAll()Object.notify()的主要区别是:前者是唤醒在同一个锁上所有在等待的线程,而后者只会唤醒其中的任意一个。不管是前者还是后者,被唤醒并不意味着可以执行,只是表示它们有争夺进入临界区的资格,最终只有一个线程可以进入临界区,而其他被唤醒的线程将会阻塞在内置锁的争夺上,等待锁被释放。

public class WaitNotify {
    private static final Object mutex = new Object();

    public static void main(String ...args) throws Exception {
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (mutex) {
                    System.out.println(Thread.currentThread().getName() + ": Run in Thread A");
                    System.out.println(Thread.currentThread().getName() + ": wait");
                    try {
                        mutex.wait();
                    } catch (InterruptedException ex) {
                        // some code
                    }
                    System.out.println(Thread.currentThread().getName() + ": resume from wait");
                    System.out.println(Thread.currentThread().getName() + ": exit");
                }
            }
        });
        threadA.setName("Thread-A");

        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (mutex) {
                    System.out.println(Thread.currentThread().getName() + ": Run in Thread B");
                    System.out.println(Thread.currentThread().getName() + ": notify");
                    mutex.notify();
                    System.out.println(Thread.currentThread().getName() + ": exit");
                }
            }
        });
        threadB.setName("Thread-B");

        // 启动线程A
        threadA.start();

        // 等待一段时间,保证线程A已经启动,并且已经在等待
        Thread.sleep(1000);

        threadB.start();
    }
}

输出结果:

$ java WaitNotify
Thread-A: Run in Thread A
Thread-A: wait
Thread-B: Run in Thread B
Thread-B: notify
Thread-B: exit
Thread-A: resume from wait
Thread-A: exit

可以从输出中看到,只有当线程 thread-B 从临界区返回,释放了锁以后,线程 thread-A 才得以真正恢复执行。

信号丢失

上面的代码中,我们需要满足 ThreadA 先于 ThreadB 执行的条件,并且在 ThreadA 进入wait()等待以后,才执行 ThreadBnotify(),这样程序才会正确执行。这样做的目的,是为了防止信号丢失。首先,我们来看下,调整 ThreadAThreadB 的执行顺序以后,程序会怎么样。我们让 ThreadB 先执行一段时间后,再执行 ThreadA

public class WaitNotify {
    private static final Object mutex = new Object();

    public static void main(String ...args) throws Exception {
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (mutex) {
                    System.out.println(Thread.currentThread().getName() + ": Run in Thread A");
                    System.out.println(Thread.currentThread().getName() + ": wait");
                    try {
                        mutex.wait();
                    } catch (InterruptedException ex) {
                        // some code
                    }
                    System.out.println(Thread.currentThread().getName() + ": resume from wait");
                    System.out.println(Thread.currentThread().getName() + ": exit");
                }
            }
        });
        threadA.setName("Thread-A");

        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (mutex) {
                    System.out.println(Thread.currentThread().getName() + ": Run in Thread B");
                    System.out.println(Thread.currentThread().getName() + ": notify");
                    mutex.notify();
                    System.out.println(Thread.currentThread().getName() + ": exit");
                }
            }
        });
        threadB.setName("Thread-B");

        threadB.start();

        // 等待一段时间,保证线程B已经运行一段时间
        Thread.sleep(1000);

        // 启动线程A
        threadA.start();
    }
}

输出结果:

$ java WaitNotify
Thread-B: Run in Thread B
Thread-B: notify
Thread-B: exit
Thread-A: Run in Thread A
Thread-A: wait

上面的代码,加粗部分是改动后的代码,我们交换了 ThreadAThreadB 的启动顺序,保证 ThreadAwait()前,ThreadB 已经notify()过了。可以看到,当输出 Thread-A: wait 以后,会发现程序并没有退出,而是被阻塞而无法退出。为什么呢?我们可以通过 jstack 工具看下当前JVM的调用栈,可以发现如下的输出:

$ jstack process_num
"Thread-A" #12 prio=5 os_prio=31 tid=0x00007faba4073800 nid=0x5207 in Object.wait() [0x00007000013cf000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x0000000795778bf8> (a java.lang.Object)
    at java.lang.Object.wait(Object.java:502)
    at WaitNotify$1.run(WaitNotify.java:15)
    - locked <0x0000000795778bf8> (a java.lang.Object)
    at java.lang.Thread.run(Thread.java:745)

可以看到,ThreadA 当前正阻塞在wait()调用上,它在等他条件发生。但是在我们的程序中,条件永远也不会发生了,因为 ThreadB 已经通知过了(但是丢失了),并且退出了,所以 ThreadA 将会永远等待下去。

由于Java没有在通知以后保留信号,所以一旦在等待之前通知已经发出,那么先前发出的信号就会被丢失,这就导致了上面的问题。为了防止信号被丢失,我们需要在写代码的时候,需要在通知发生的时候保留下来这个信号。通常,可以使用使用一个单独的二值状态变量来记录信号,可以是 int 类型的0/1,或者 boolean 类型的 true/false

现在,我们来改写下原先的代码,让它可以处理信号丢失的问题:

public class WaitNotify {
    private static final Object mutex = new Object();

    // 标记条件是否满足 0-为满足;1-满足
    private static int status = 0;

    public static void main(String ...args) throws Exception {
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (mutex) {
                    System.out.println(Thread.currentThread().getName() + ": Run in Thread A");
                    System.out.println(Thread.currentThread().getName() + ": wait");

                    // 在进入等待前,检查条件是否满足
                    if (status == 0) {
                        try {
                            mutex.wait();
                        } catch (InterruptedException ex) {
                            // some code
                        }
                    }
                    System.out.println(Thread.currentThread().getName() + ": resume from wait");
                    System.out.println(Thread.currentThread().getName() + ": exit");
                }
            }
        });
        threadA.setName("Thread-A");

        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (mutex) {
                    System.out.println(Thread.currentThread().getName() + ": Run in Thread B");
                    System.out.println(Thread.currentThread().getName() + ": notify");
                    // 记录信号已经发生
                    status = 1;

                    mutex.notify();
                    System.out.println(Thread.currentThread().getName() + ": exit");
                }
            }
        });
        threadB.setName("Thread-B");

        threadB.start();

        // 等待一段时间,保证线程B已经运行一段时间
        Thread.sleep(1000);

        // 启动线程A
        threadA.start();
    }
}

输出结果:

$ java WaitNotify
Thread-B: Run in Thread B
Thread-B: notify
Thread-B: exit
Thread-A: Run in Thread A
Thread-A: wait
Thread-A: resume from wait
Thread-A: exit

这次,我们使用一个 status 变量来保存信号。当我们需要wait()的时候,先检查下 status 状态是否已经被设置,如果被设置,则表示信号已经发生过了,不需要再次调用wait(),而在notify()之前,我们先把信号保留到 status 字段中,这样就保证信号不会丢失。现在,程序可以正确执行了。

意外唤醒

现在,我们的代码可以正确执行了,好像没有问题了。但是,在我们调用wait()阻塞期间,JVM有可能会意外地唤醒阻塞的线程,这被称为 Spurious wakeup

Spurious wakeup describes a complication in the use of condition variables as provided by certain multithreading APIs such as POSIX Threads and the Windows API.

由于存在意外唤醒,所以,如果我们在wait()等待条件发生的时候,线程被意外唤醒了,那么我们的代码就应该重新进入wait()状态而不是继续执行。为了处理意外唤醒的情况,我们还需要对 ThreadA 的代码做一些改动:

Thread threadA = new Thread(new Runnable() {
    @Override
    public void run() {
        synchronized (mutex) {
            System.out.println(Thread.currentThread().getName() + ": Run in Thread A");
            System.out.println(Thread.currentThread().getName() + ": wait");
            while (status == 0) {
                try {
                    mutex.wait();
                } catch (InterruptedException ex) {
                    // some code
                }
            }
            System.out.println(Thread.currentThread().getName() + ": resume from wait");
            System.out.println(Thread.currentThread().getName() + ": exit");
        }
    }
});

我们将if (status == 0) 改成了while(status == 0),这样,当被唤醒的时候,我们可以判断是否是因为条件满足而被唤醒的,还是被意外唤醒的。

中断

一个线程启动以后,因为一些情况,我们需要让在跑的线程停止运行,比如某个在运行的任务被临时取消。我们需要一种机制,可以让已经启动的线程中途退出。Java提供了一种称为”中断”的机制,来告知运行中的线程终止运行。Java的 Thread 类中,下面的三个方法和中断相关。

public class Thread implement Runnable {
    public static boolean interrupted() {}
    public void interrupt() {}
    public boolean isInterrupted() {}
}

当需要中断一个正在运行的线程的时候,可以通过Thread.interrupt()向需要被中断的线程发送中断请求。然后,被中断的线程将会设置线程的中断状态,如果被中断的线程当前正处于阻塞状态,并且当前的阻塞状态是可被中断的,那么在当前阻塞的地方会抛出一个 InterruptedException 异常,线程会从阻塞状态唤醒,并且线程的中断状态会消除(这是为下一次中断做准备)。如果被中断的线程处于运行状态,那么外部发起的中断请求只会设置当前线程的中断状态,标记该线程被中断了,但是并不会发生任何事情,除非被中断的线程将要被挂起或者该线程主动去检查中断状态。

如果一个线程当前的中断状态被设置了,那么当改线程将要从运行状态被挂起的时候,会检查当前的中断状态是否被设置,如果当前的中断状态被设置(改线程已经被请求中断),那么该线程不会被挂起,而是抛出一个 InterruptedException ,并重置中断状态。这种处理机制的目的,是为了防止中断信号丢失。设想,如果不检查中断状态而将线程挂起,那么有可能这个被挂起的线程将会永远无法退出,除非后面再次收到中断请求,但是再次收到中断请求的情况是未知的,所以在被挂起前对中断状态做检查是必要的。

通过对当前线程调用Thread.isInterrupted()可以检查当前线程的中断状态是否被设置。这是运行中的线程检查中断状态的一种途径,使用这种方式来检查线程的中断状态的好处是,只有运行中的线程才知道当前是否是一个合适的执行点终止改线程,当代码需要检查中断状态的时候,一般意味着当前是一个可以退出执行的安全的执行点。

Thread.isInterrupted()方法接受一个 boolean 参数,表示是否清除中断状态标记,true 表示检查完以后清除中断标记,false 表示保留中断标记(用于在下一个检查点判断)。Thread.interrupted()方法是一个静态方法,它判断当前线程是否处于被中断状态,并且清除当前的中断状态。改静态方法是对Thread.isInterrupted()方法的包装。

public static boolean interrupted() {
    return currentThread().isInterrupted(true);
}

Java的中断机制,实际上并没有真正中断一个线程。Java的中断以一种相对比较温和的方式中断线程,它以合作的方式来中断一个线程。不管是通过抛出异常或者通过线程自觉检查中断状态,对于被中断的线程而言,当它收到中断请求以后,它可以选择在合适(安全)的执行点响应这个中断请求,也可以丢弃这个中断请求,继续执行,这取决于线程代码编写者的意图。一般建议在线程代码编写的时候,应该对中断请求做出相应或者将中断状态存储下来,以便在下一个合适的执行点,对这个中断请求做出响应,而不应该简单得忽视这个中断请求。除非忽视这个中断是允许,并且这个中断请求是意料之中,可以被忽略的,才可以丢弃这个中断状态。

结束语

通过上面的介绍,我们已经大致了解了在Java中编写多线程代码的基本知识。我们从如何在Java中创建一个线程开始,学习了如何用Java提供的内置锁来实现同步。讨论了如何通过通知机制进行线程间通信,简单探讨了通知机制中信号的丢失和意外唤醒问题,并给出了简单的解决方案。

TOP