05-线程间的通信

线程之间协调工作的方式

基于等待通知模型的通信

等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超类java.lang.Object上。

相关API

  • notify: 通知一个对象上等待的线程,使其从wait方法返回,而返回的前提是该线程获取到了对象的锁

  • notifyAll: 通知对象上所有等待的线程,使其从wait方法返回

  • wait: 使线程进入WAITING(后面线程的生命周期讲)状态,只有等待另一个线程通知或者被中断才返回,需要注意的是,调用wait方法后需要释放对象的锁

  • wait(long): 和wait类似,加入了超时时间,超时了还没被通知就直接返回

  • wait(long, int): 纳秒级,不常用

一些需要注意的点:

  • 使用wait()、notify()和notifyAll()时需要先对调用对象加锁

  • 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列,释放锁

  • notify()或notifyAll()方法调用后,等待线程不会立即从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。

  • notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。

  • 从wait()方法返回的前提是获得了调用对象的锁。

关于等待队列和同步队列

  • 同步队列(锁池):假设线程A已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用这个对象的某个synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前必须先获得该对象的锁的拥有权,但是该对象的锁目前正被线程A拥有,所以这些线程就进入了该对象的同步队列(锁池)中,这些线程状态为Blocked

  • 等待队列(等待池):假设一个线程A调用了某个对象的wait()方法,线程A就会释放该对象的锁(因为wait()方法必须出现在synchronized中,这样自然在执行wait()方法之前线程A就已经拥有了该对象的锁),同时 线程A就进入到了该对象的等待队列(等待池)中,此时线程A状态为Waiting。如果另外的一个线程调用了相同对象的notifyAll()方法,那么 处于该对象的等待池中的线程就会全部进入该对象的同步队列(锁池)中,准备争夺锁的拥有权。如果另外的一个线程调用了相同对象的notify()方法,那么 仅仅有一个处于该对象的等待池中的线程(随机)会进入该对象的同步队列(锁池)。

以上来自啃碎并发(二):Java线程的生命周期

等待通知模型的示例

class WaitNotifyModel {
    Object lock = new Object();
    boolean flag = false;

    public void start() {
        Thread A = new Thread(() -> {
            synchronized (lock) {
                while (!flag) {
                    try {
                        System.out.println(Thread.currentThread().getName()+":等待通知");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName()+ ":收到通知,处理业务逻辑");
            }
        });
        A.setName("我是等待者");
        Thread B = new Thread(() -> {
            synchronized (lock) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                flag = true;
                System.out.println(Thread.currentThread().getName()+":发出通知");
                lock.notify();
            }
        });
        B.setName("通知者");
        A.start();
        B.start();
    }
}

模型

等待者

 synchronized (对象) {
    while (不满足条件) {
        对象.wait()
    }
    处理业务逻辑
}

通知者

synchronized (对象) {
    改变条件
    对象.notify();
}

基于Condition的通信

上述的这种等待通知需要使用synchronized, 如果使用Lock的话就要用Condition

Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式

Condition与Object监视器的区别

项目

Object的监视器方法

Condition

前置条件

获得对象的锁

Lock.lock()获取锁 Lock.newCondition()获取Condition

调用方式

obj.wait()

condition.await()

等待队列个数

一个

可以多个

当前线程释放锁并进入等待状态

支持

支持

等待状态中不响应中断

不支持

支持

释放锁进入超时等待状态

支持

支持

进入等待状态到将来的某个时间

不支持

支持

唤醒等待中的一个或多个线程

支持 notify notifyAll

支持signal signalAll

这里有一些线程的状态,可以看完后边的线程的生命周期再回过头看看

示例

一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

实现一个有界队列,当队列为空时阻塞消费线程,当队列满时阻塞生产线程

class BoundList<T> {
    private LinkedList<T> list;
    private int size;
    private Lock lock = new ReentrantLock();
    // 拿两个condition,一个是非空,一个是不满
    private Condition notEmpty = lock.newCondition();
    private Condition notFullCondition = lock.newCondition();

    public BoundList(int size) {
        this.size = size;
        list = new LinkedList<>();
    }

    public void push(T x) throws InterruptedException {
        lock.lock();
        try {
            while (list.size() >= size) {
                // 满了就等待
                notFullCondition.await();
            }
            list.push(x);
            // 唤醒等待的消费者
            notEmpty.signalAll();

        } finally {
            lock.unlock();
        }
    }

    public T get() throws InterruptedException {
        lock.lock();
        try {
            while (list.isEmpty()) {
                // 空了就等
                notEmpty.await();
            }
            T x = list.poll();
            // 唤醒生产者
            notFullCondition.signalAll();
            return x;
        } finally {
            lock.unlock();
        }
    }

}

public class Demo_02_05_1_Condition {
    public static void main(String[] args) {
        BoundList<Integer> list = new BoundList<>(10);
        // 生产数据的线程
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    Thread.sleep(1000);
                    list.push(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 消费数据的线程
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    System.out.println(list.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

基于BlockingQueue实现线程通信

这里列个标题,后边说到了BlockingQueue就明白怎么做了。

可以参看第05章的相关内容

最后更新于