任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、notify()、notifyAll()等方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式,具体详见《Java线程协作:wait/notify》。
Condition是java.util.concurrent.locks包中的一个接口,它也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,替代Object.wait()/notify()。与Object.wait()/notify()相比,Condition提供了更高级的功能,能够创建多个条件变量,每个条件变量都有自己的等待队列,使得线程管理更加灵活和精确。
Condition 使用介绍
Condition对象是由Lock对象创建出来的(调用Lock对象的newCondition()方法)。当前线程调用Condition的等待/通知方法时,需要提前获取到Condition对象关联的锁。当调用await()方法后,当前线程会释放锁并在此等待;而其他线程调用Condition对象的signal()方法通知当前线程,并且当前线程重新竞争成功获取到同步锁后,当前线程才会从await()方法返回。
以生产者-消费者模型为例,使用Condition实现如下:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumerExample { private final Lock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); // 缓冲区非空条件 private final Condition notFull = lock.newCondition(); // 缓冲区未满条件 private final int[] buffer = new int[5]; private int count = 0; // 当前元素数量 private int putIndex = 0; private int takeIndex = 0; // 生产者方法 public void produce(int item) throws InterruptedException { lock.lock(); try { // 使用 while 而不是 if 判断条件,防止虚假唤醒 while (count == buffer.length) { // 缓冲区满,等待 notFull 条件 // await() 会自动释放锁,并在被唤醒后重新获取锁 notFull.await(); } buffer[putIndex] = item; putIndex = (putIndex + 1) % buffer.length; count++; System.out.println("Produced: " + item + ", count=" + count); // 通知消费者:缓冲区非空 // signal() 只唤醒一个等待线程,signalAll() 则唤醒所有 notEmpty.signal(); } finally { lock.unlock(); } } // 消费者方法 public int consume() throws InterruptedException { lock.lock(); try { while (count == 0) { // 缓冲区空,等待 notEmpty 条件 // await() 会自动释放锁,并在被唤醒后重新获取锁 notEmpty.await(); } int item = buffer[takeIndex]; takeIndex = (takeIndex + 1) % buffer.length; count--; System.out.println("Consumed: " + item + ", count=" + count); // 通知生产者:缓冲区未满 notFull.signal(); return item; } finally { // 在finally块中释放锁,确保异常情况下也能解锁 lock.unlock(); } } // 测试 public static void main(String[] args) { ProducerConsumerExample pc = new ProducerConsumerExample(); Thread producer = new Thread(() -> { for (int i = 1; i <= 10; i++) { try { pc.produce(i); Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); Thread consumer = new Thread(() -> { for (int i = 1; i <= 10; i++) { try { pc.consume(); Thread.sleep(150); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); producer.start(); consumer.start(); } }Condition 原理
在 JUC (java.util.concurrent)并发包中,Condition并不是一个孤立的组件,而是深度集成于Lock接口和 AQS(AbstractQueuedSynchronizer)框架中的关键协作机制。
ConditionObject实现了Condition接口,是同步器AbstractQueuedSynchronizer的内部类。每个Condition对象都包含一个单向等待队列,等待队列是一个 FIFO 的单向队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程:
在Object的监视器模型上,一个对象拥有一个同步队列和一个等待队列(详见Java并发:synchronized原理详解)。而 JUC 并发包中的 Lock AQS 同步器则拥有一个同步队列和多个等待队列,同步队列和等待队列中的节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node:
await() 等待过程
当前线程(已获取到同步锁)调用Condition的await()方法(或者以 await 开头的方法),会使当前线程构造新节点加入等待队列尾部,并释放同步锁,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
从队列(同步队列和等待队列)的角度看await()方法,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列尾部。
await() 源码如下(JDK 8):
public final void await() throws InterruptedException { // 入口检查中断,若已中断,立即抛异常。 if (Thread.interrupted()) throw new InterruptedException(); // 1、将当前线程加入 Condition 等待队列 AbstractQueuedLongSynchronizer.Node node = addConditionWaiter(); // 2、完全释放锁,并保存锁状态 long savedState = fullyRelease(node); int interruptMode = 0; // 3、循环阻塞,直到被 signal(节点移入同步队列)或中断。 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 4、在同步队列中重新获取锁。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清理取消节点(防内存泄漏) if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }signal() 通知过程
当前线程(必须已获取同步锁)调用Condition的signal()方法,将会获取在等待队列中等待时间最长的节点(首节点),将其移到同步队列中,并使用LockSupport唤醒节点中的线程,开始尝试竞争获取同步状态。成功竞争获取到同步状态(同步锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了同步锁。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。
Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程去重新竞争获取同步锁。
从队列(同步队列和等待队列)的角度看signal()方法,相当于从等待队列取出头节点,重新加入同步队列竞争锁。
signal() 源码如下(JDK 8):
public final void signal() { // 检查调用者是否持有锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取等待队列的头节点 // firstWaiter 是 ConditionObject 内部字段,指向 Condition 等待队列的第一个节点 AbstractQueuedLongSynchronizer.Node first = firstWaiter; // 若等待队列非空,则执行唤醒 if (first != null) doSignal(first); } private void doSignal(AbstractQueuedLongSynchronizer.Node first) { do { // 从等待队列中移除头节点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 尝试将 first 节点从等待队列转移到AQS的同步队列 // 被转移的线程将在同步队列中重新竞争锁,并在获取锁后才从 await() 返回 // 一旦成功转移一个有效节点,循环结束 } while (!transferForSignal(first) && (first = firstWaiter) != null); }Object Monitor 与 Condition 对比
| 对比维度 | Object 监视器(synchronized + wait/notify) | Condition(ReentrantLock + Condition) |
| 所属机制 | Java 内置监视器(JVM 层原生支持) | java.util.concurrent.locks 包提供的显式锁机制 |
| 锁获取方式 | 隐式:通过 synchronized 关键字自动加锁/释放 | 显式:需手动调用 lock() / unlock() |
| 条件等待方法 | wait(), wait(long), wait(long, int) | await(), awaitNanos(), awaitUntil(), awaitUninterruptibly() |
| 唤醒方法 | notify(), notifyAll() | signal(), signalAll() |
| 条件队列数量 | 每个对象只有 1 个 等待队列 | 每个 Lock 可创建 多个独立的 Condition,每个对应一个等待队列 |
| 中断响应 | wait() 可被中断,抛出 InterruptedException | 提供多种选择: • await():可中断 • awaitUninterruptibly():不可中断 • 支持超时+中断组合 |
| 锁释放行为 | 调用 wait() 时自动释放锁,被唤醒后自动重新获取 | 调用 await() 时完全释放锁(包括重入次数),返回前重新竞争获取 |
| 使用前提 | 必须在 synchronized 块内调用 wait/notify | 必须在持有 Lock 的情况下调用 await/signal |
| 异常安全性 | 自动释放锁,无需担心忘记释放 | 必须在 finally 块中调用 unlock(),否则可能死锁 |
| 性能与可扩展性 | JVM 优化较好,但功能受限 | 更灵活,适合复杂并发场景;AQS 底层高效,支持自定义同步器 |
| 公平性支持 | 不支持公平锁 | ReentrantLock 可配置为公平锁(FairSync) |
| 典型应用场景 | 简单线程协作(如基础生产者-消费者) | 复杂多条件协作(如阻塞队列、读写分离条件等) |
| 底层实现 | JVM 的 ObjectMonitor(C++ 实现) | 基于 AQS 的 ConditionObject(Java 实现) |