Java中断机制
2018-09-20 | 分类 Multithread | 标签 Java Multithread

什么是中断

中断,顾名思义,就是终止,打断的意思。

当我们的程序在执行过程中,在没有外界干预的前提下,如果程序执行过程中不出错,那么程序就可以一直执行下去,直到程序结束退出。

当我们在写多线程代码的时候,因为某些原因,可能是出现了错误,我们需要让运行中的线程停下来,这个时候,中断机制就可以派上。

接下来,我将介绍在Java中是如何产生一个中断的,以及Java的中断机制。本文假设您具有Java多线程的基础知识。

创建一个线程

在产生一个中断之前,我们首先需要创建一个线程。在Java中,创建线程的方式有两种。一种是通过继承Thread类来创建一个线程对象。

public class MyThread extends Thread {
    @Override
    public void run() {
        // TODO target code
    }
    
    public static void main(String ...args) {
        new MyThread().start();
    }
}

另一种是通过实现一个Runnable接口来定义个任务,然后由线程去执行这个任务。

// Task class
public class Task implement Runnable {
    public void run() {
    	// TODO target code        
    }
    
    public static void main(String ...args) {
        new Thread(new Task()).start();
    }
}

产生一个中断

在Java中,可以通过Thread的成员方法interrupt()来对一个线程发起中断。当我们持有一个线程对象的时候,我们就可以通过threadObject.interrupt()来中断threadObject这个线程对象所指的那个线程。

public class MyThread extends Thread {
    public MyThread(String name) {
        super(name);
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);    // <---- ⑤
        } catch(InterruptedException ex) {  // <---- ⑥
            System.out.println(Thread.currentThread().getName() + ": Interrupted");
        }
    }

    public static void main(String ...args) throws Exception {
        Thread myThread = new MyThread("MyThread");    // <---- ①
        myThread.start();    // <---- ②
        myThread.interrupt();    // <---- ③
        myThread.join();    // <---- ④
    }
}

运行结果:

$ java MyThread
MyThread: Interrupted

上面代码就演示了一次中断过程。在①处我们首先创建了一个线程名为MyThread的线程,然后在②处我们调用它的start()方法启动该线程。在MyThread这个线程的run()方法中,我们通过Thread.sleep(1000)让线程挂起1秒种,目的是让我们③处的代码执行的时候,MyThread这个线程还没有执行完。在run()方法中,我们通过try-catch语句捕获一个中断异常。当③处的代码对MyThread发起一个中断以后,⑥处的catch语句会捕获这个异常,并且在终端输出中断信息。

中断检查

既然Java中引入了中断的机制,那么当程序在执行过程中,什么时候会响应中断的请求呢?是不是程序一收到中断请求,程序就立即响应中断请求?当程序收到中断请求以后,程序的行为是怎么样的呢? 带着这些疑问,我们来看下Java中的中断机制是如何工作的。

首先,Java的中断以协作的方式工作的。当一个线程A对线程B发起一个中断请求以后,B线程何时中断,中断以后做什么,都是由线程B控制的。对于线程A来说,它发出的中断不是命令式的,而是请求式的。换句话说,线程B完全可以在程序中忽略A发起的中断请求而继续执行(运行良好的多线程程序,应该正确合理得响应中断),中断请求是否被处理取取决于线程B的代码实现。

在Java中,每个线程对象都有一个状态标记,用于表示线程的中断状态。通过Thread.currentThread().isInterrupted()可以检查当前线程的中断标记位是否被设置。如果当前线程收到了一个中断请求,那么该方法返回True,否则返回False。

public class MyThread extends Thread {
    public MyThread(String name) {
        super(name);
    }

    @Override
    public void run() {
        while (true) {
            if (Thread.currentThread().isInterrupted()) {  // <-- ①
                System.out.println("Interrupted");
                break;
            }
        }
    }

    public static void main(String ...args) throws Exception {
        Thread myThread = new MyThread("MyThread");
        myThread.start();
        myThread.interrupt();
        myThread.join();
    }
}

运行结果:

$ java MyThread
Interrupted

除了用Thread.currentThread().isInterrupted()去检查线程的中断状态,Thread类中还包含了一个Thread.interrupted()静态方法,可以判断当前线程是否被中断。Thread.interrupted()方法和Thread.currentThread().isInterrupted()方法最大的区别是:调用前者以后,线程的中断标记被会重置,而后者只是检查线程的中断标记,并不影响中断标志的值。

public static void main(String ...args) throws Exception {
    Thread t = new Thread(() -> {
        while (true) {
            if (Thread.interrupted()) {
                // 中断标记被清除
                System.out.println("Interrupted status: " + Thread.currentThread().isInterrupted());
                break;
            }
        }
    });

    t.start();

    // 阻塞1秒,保证线程t已经启动的情况下发送中断请求
    TimeUnit.SECONDS.sleep(1);

    t.interrupt();

    t.join();
}

输出结果:

$ java MyThread
Interrupted status: false

InterruptedException异常

除了通过在程序中主动检查异常中断标记,如果程序阻塞在一些阻塞调用上,那么如何感知到线程被中断呢?Java中有一个InterruptedException异常,当线程调用一些阻塞调用被挂起的时候,如果线程的中断标记被设置,那么线程将从阻塞状态唤醒,抛出InterruptedException异常。一旦抛出InterruptedException异常,那么线程的中断状态将被重置。

public static void main(String ...args) throws Exception {
    Thread t = new Thread(() -> {
        try {
            while (true) {
                TimeUnit.SECONDS.sleep(1000);
            }
        } catch (InterruptedException ex) {
            System.out.println("Catch InterruptedException, interrupted status: " + Thread.currentThread().isInterrupted());
        }
    });

    t.start();

    t.interrupt();

    t.join();
}

运行结果:

$ java MyThread
Catch InterruptedException, interrupted status: false

InterruptedException异常清除中断标记

前面提到,抛出InterruptedException以后,会清除中断标记。准确的讲,只有处于阻塞状态下抛出的InterruptedException异常才会清空线程当前的中断状态。在非阻塞状态下,即使线程的中断状态被设置,主动抛出一个中断异常也不会清楚中断状态。

public static void main(String ...args) throws Exception {
    Thread t = new Thread(() -> {
        try {
            while (true) {
                if (Thread.currentThread().isInterrupted()) {
                    // 当线程的中断状态被设置以后,主动抛出一个中断异常
                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException ex) {
            System.out.println("Catch interrupted exception, interrupted status: " + Thread.currentThread().isInterrupted());
        }
    });

    t.start();

    TimeUnit.SECONDS.sleep(1);

    t.interrupt();

    t.join();
}

输出结果:

$ java MyThread
Catch interrupted exception, interrupted status: true

可以看到,中断标记并没有因为抛出了中断异常而清除。

锁和中断

当多个线程在竞争锁的时候,获取不到锁的线程会阻塞在获取锁的操作上。如果线程在获取锁的时候阻塞了,当线程被中断以后线程的行为是怎么样的呢?我们知道,在Java中锁的实现可以粗略分为两种:synchronized内置锁和java.util.concurrent包里面Lock锁。

下面我们看下两种锁实现对中断是如何处理的。

synchronized内置锁

synchronized内置锁是Java语言层面支持的锁实现。当一个线程阻塞在获取synchronized内置锁的时候被中断了,那么线程将继续阻塞在获取锁的操作上,直到获取到锁为止。

private static final Object lock = new Object();

public static void main(String... args) throws Exception {
    Thread t1 = new Thread(() -> {
        synchronized (lock) {
            System.out.println("T1 enter block");
            try {
                // 模拟长时间持有锁的场景
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            } finally {
                System.out.println("T1 exit block");
            }
        }
    });

    Thread t2 = new Thread(() -> {
        try {
            // 保证t1能先持有锁,模拟t2阻塞等待锁的场景
            TimeUnit.SECONDS.sleep(1);
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        // 阻塞等待t1释放锁
        synchronized (lock) {
            System.out.println("T2 enter block, current thread interrupt status: " +
                    Thread.currentThread().isInterrupted());
            System.out.println("T2 exit block");
        }
    });

    t1.start();
    t2.start();

    TimeUnit.SECONDS.sleep(2);

    t2.interrupt();

    t1.join();
    t2.join();
}

输出结果:

$ java MyThread
T1 enter block
T1 exit block

T2 enter block, current thread interrupt status: true
T2 exit block

可以看到,在等待持有synchronized内置锁的时候,即使线程被中断,也不会从阻塞状态唤醒。只有等到锁可用,持有锁的时候才会从阻塞状态恢复。通过检查线程的中断状态可以看到线程持有锁的时候中断状态是被设置了的。

Lock锁

Java并发包里的Lock锁实现主要是ReentrantLock。当使用JUC包里的锁保护临界区的时候,行为和synchronized内置锁一致:阻塞在等待锁上面的线程不会被中断唤醒。

private static final ReentrantLock lock = new ReentrantLock();

public static void main(String... args) throws Exception {
    Thread t1 = new Thread(() -> {
        try {
            lock.lock();
            System.out.println("T1 enter block");
            try {
                // 模拟长时间持有锁的场景
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            } finally {
                System.out.println("T1 exit block");
            }
        } finally {
            lock.unlock();
        }
    });

    Thread t2 = new Thread(() -> {
        try {
            // 保证t1能先持有锁,模拟t2阻塞等待锁的场景
            TimeUnit.SECONDS.sleep(1);
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            // 阻塞等待t1释放锁
            lock.lock();
            System.out.println("T2 enter block, current thread interrupt status: " +
                    Thread.currentThread().isInterrupted());
            System.out.println("T2 exit block");
        } finally {
            lock.unlock();
        }
    });

    t1.start();
    t2.start();

    TimeUnit.SECONDS.sleep(2);

    t2.interrupt();

    t1.join();
    t2.join();
}

输出结果:

$ java MyThread
T1 enter block
T1 exit block

T2 enter block, current thread interrupt status: true
T2 exit block

Lock的实现里,有一个Lock.lockInterruptibly()方法可以实现可以被中断的锁。当使用Lock.lockInterruptibly()等待持有锁的时候,如果线程被中断,将会抛出一个InterruptedException异常。

阻塞IO和中断

Java默认的IO实现是阻塞IO(BIO)。当进行IO操作的时候,如果数据没有准备好,对输入输出流进行读写的时候会阻塞在read()write()上。

典型的场景是对 套接字(Socket) 进行读写。如果网络上数据没有准备好,客户端或者服务端会阻塞在数据请求操作上:

public static void main(String... args) throws Exception {
    Thread t = new Thread(() -> {
        try {
            Socket socket = new Socket("127.0.0.1", 65534);
            System.out.println("Read data from peer: 127.0.0.1/65534");

            byte[] data = new byte[1024];

            // 阻塞在read操作上
            int len = socket.getInputStream().read(data);
            if (len > 0) {
                System.out.println("Recv: " + new String(data, 0, len, Charset.forName("utf8")));
            }

            System.out.println("Interrupted status: " + Thread.currentThread().isInterrupted());
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    });

    t.start();

    // 保证线程t阻塞在read操作上
    TimeUnit.SECONDS.sleep(2);

    // 发送中断
    t.interrupt();
    System.out.println("Send interrupt to thread t successful");

    t.join();
}

首先使用nc命令开启一个监听在65534端口上的服务端程序:

$ nc -l 65534

然后启动上面的程序,可以看到如下输出:

$ java MyThread
Read data from peer: 127.0.0.1/65534
Send interrupt to thread t successful

上面的输出可以看到,t1阻塞在read操作上,然后在主线程中通过t.interrupt()发送一个中断给线程t。可以看到,线程t并没有中断。然后在nc启动的服务端输入hello,使得程序可以从read操作返回,并检查线程的中断状态,输出如下:

$ nc
...
Recv: hello

Interrupted status: true

可以看到,线程t的中断状态被设置了。

NIO中实现了支持中断的IO,所有实现了 InterruptibleChannel 接口的 Channel 支持异步关闭和中断。当线程阻塞在read操作上的时候,通过线程中断,阻塞在read操作上的线程会抛出 ClosedByInterruptException

阻塞前被中断

一种场景是:线程在调用阻塞方法之前,线程已经被中断了(线程的中断标记被设置)。这个时候如果线程尝试进入阻塞状态,那么会抛出 InterruptedException 异常。对于可以被中断的Channel IO,会抛出 ClosedByInterruptException

一个sleep阻塞的例子:

public static void main(String... args) throws Exception {
    Thread t = new Thread(() -> {
        try {
            // 保证线程的中断标记被设置
            while (!Thread.currentThread().isInterrupted()) {
                // Spin
            }

            System.out.println("Thread interrupt status: " + Thread.currentThread().isInterrupted());

            // 在带有中断标记的情况下尝试调用sleep操作阻塞
            Thread.sleep(1);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    });

    t.start();

    // 保证线程t阻塞在read操作上
    TimeUnit.SECONDS.sleep(2);

    // 发送中断
    t.interrupt();
    System.out.println("Send interrupt to thread t successful");

    t.join();
}

输出结果:

$ java MyThread
Send interrupt to thread t successful
Thread interrupt status: true
java.lang.InterruptedException: sleep interrupted
  at java.lang.Thread.sleep(Native Method)
  at org.sample.Java8Feature.lambda$main$0(Java8Feature.java:21)
  at java.lang.Thread.run(Thread.java:745)

可以看到,当尝试sleep的时候,线程抛出 InterruptedException。下面是一个可被中断IO的例子:

public static void main(String... args) throws Exception {
    Thread t = new Thread(() -> {
        try {
            SocketAddress sa = new InetSocketAddress("127.0.0.1", 65534);
            SocketChannel channel = SocketChannel.open(sa);
            System.out.println("Read data from peer: 127.0.0.1/65534");

            ByteBuffer buffer = ByteBuffer.allocate(1024);

            // 保证线程的中断标记被设置
            while (!Thread.currentThread().isInterrupted()) {
                // Spin
            }

            // 带有中断标记的线程尝试进入阻塞状态
            int len = channel.read(buffer);
            if (len > 0) {
                System.out.println("Recv: " + new String(buffer.array(), 0, len, Charset.forName("utf8")));
            }

            System.out.println("Interrupted status: " + Thread.currentThread().isInterrupted());
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    });

    t.start();

    // 保证线程t阻塞在read操作上
    TimeUnit.SECONDS.sleep(2);

    // 发送中断
    t.interrupt();
    System.out.println("Send interrupt to thread t successful");

    t.join();
}

通过nc启动监听在65534端口上的tcp服务程序,观察输出结果:

$ java MyThread
Read data from peer: 127.0.0.1/65534
Send interrupt to thread t successful
java.nio.channels.ClosedByInterruptException
  at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:407)
  at org.sample.Java8Feature.lambda$main$0(Java8Feature.java:32)
  at java.lang.Thread.run(Thread.java:745)

可以看到,当线程t在带有中断标记的情况下阻塞在read操作上的时候,会抛出 ClosedByInterruptException 异常。

TOP