言:为什么我们需要并发容器?
想象一下传统的超市结账场景:只有一个收银台,所有人排成一队,效率低下。这就是传统集合在多线程环境下的写照。
而现代并发容器就像拥有多个收银台的智能超市:
多个收银台同时工作
智能分配顾客到不同队列
收银员之间互相协助
在Java并发世界中,我们有三大法宝:
ConcurrentHashMap - 智能分区的储物柜系统
ConcurrentLinkedQueue - 无锁的快速通道
阻塞队列 - 有协调员的等待区
Fork/Join框架 - 团队协作的工作模式
让我们一一探索它们的魔力!
1. ConcurrentHashMap:智能分区的储物柜系统
1.1 传统Map的问题:独木桥的困境
// 传统HashMap在多线程环境下就像独木桥
public class HashMapProblem {
public static void main(String[] args) {
Map<String, String> map = new HashMap<>();
// 多个线程同时操作HashMap,就像多人同时过独木桥
// 结果:有人掉水里(数据丢失),桥塌了(死循环)
}
}
1.2 ConcurrentHashMap的解决方案:多车道高速公路
分段锁设计:把整个Map分成多个小区域,每个区域独立加锁
ConcurrentHashMap架构:
├── 区域1 (锁1) → 储物柜组1
├── 区域2 (锁2) → 储物柜组2
├── 区域3 (锁3) → 储物柜组3
└── ...
核心优势:
写操作只锁住对应的区域,其他区域仍可读写
读操作基本不需要加锁
大大提高了并发性能
1.3 实战示例:高性能缓存系统
/**
* 基于ConcurrentHashMap的高性能缓存
* 像智能储物柜系统,支持高并发存取
*/
public class HighPerformanceCache<K, V> {
private final ConcurrentHashMap<K, CacheEntry<V>> cache =
new ConcurrentHashMap<>();
// 获取或计算缓存值(线程安全且高效)
public V getOrCompute(K key, Supplier<V> supplier) {
return cache.computeIfAbsent(key, k ->
new CacheEntry<>(supplier.get())).getValue();
}
// 批量获取,利用并发特性
public Map<K, V> getAll(Set<K> keys) {
Map<K, V> result = new HashMap<>();
keys.forEach(key -> {
CacheEntry<V> entry = cache.get(key);
if (entry != null && !entry.isExpired()) {
result.put(key, entry.getValue());
}
});
return result;
}
}
2. ConcurrentLinkedQueue:无锁的快速通道
2.1 无锁队列的魔法
传统队列就像只有一个入口的隧道,所有车辆必须排队。而ConcurrentLinkedQueue就像多入口的立体交通枢纽:
// 无锁队列的生动理解
public class LockFreeQueueAnalogy {
public void trafficHubComparison() {
// 传统阻塞队列:单入口隧道,经常堵车
// ConcurrentLinkedQueue:立体交通枢纽,多入口同时通行
// 秘密武器:CAS(Compare-And-Swap)算法
}
}
2.2 CAS:优雅的竞争解决
CAS就像礼貌的询问:
public class PoliteInquiry {
public void casAnalogy() {
// 传统加锁:像抢座位,谁先坐到就是谁的
// CAS无锁:像礼貌询问"这个座位有人吗?"
// 如果没人就坐下,有人就找下一个座位
}
}
2.3 实战示例:高并发任务处理器
/**
* 基于ConcurrentLinkedQueue的高性能任务处理器
* 像高效的快递分拣中心
*/
public class HighPerformanceTaskProcessor {
private final ConcurrentLinkedQueue<Runnable> taskQueue =
new ConcurrentLinkedQueue<>();
// 提交任务 - 无锁操作,极高吞吐量
public void submit(Runnable task) {
taskQueue.offer(task); // 像快递放入分拣流水线
startWorkerIfNeeded();
}
// 工作线程 - 无锁获取任务
private class Worker implements Runnable {
public void run() {
while (!Thread.currentThread().isInterrupted()) {
Runnable task = taskQueue.poll(); // 像从流水线取快递
if (task != null) {
task.run(); // 处理任务
}
}
}
}
}
3. 阻塞队列:有协调员的等待区
3.1 阻塞队列的四种行为模式
想象餐厅的四种接待方式:
public class RestaurantReception {
public void fourBehaviors() {
// 1. 抛出异常 - 霸道的服务员
// "没位置了!走开!"
// 2. 返回特殊值 - 礼貌的前台
// "抱歉现在没位置,您要不等会儿?"
// 3. 一直阻塞 - 耐心的门童
// "请您在这稍等,有位置我马上叫您"
// 4. 超时退出 - 体贴的经理
// "请您等待10分钟,如果还没位置我帮您安排其他餐厅"
}
}
3.2 七种阻塞队列:不同的餐厅风格
Java提供了7种阻塞队列,每种都有独特的"经营理念":
ArrayBlockingQueue:传统固定座位餐厅
// 有10个桌位的餐厅,公平模式
ArrayBlockingQueue<String> restaurant = new ArrayBlockingQueue<>(10, true);
LinkedBlockingQueue:可扩展的连锁餐厅
// 最大容纳1000人的餐厅
LinkedBlockingQueue<Order> orderQueue = new LinkedBlockingQueue<>(1000);
PriorityBlockingQueue:VIP贵宾厅
// 按客户等级服务的贵宾厅
PriorityBlockingQueue<Customer> vipLounge = new PriorityBlockingQueue<>();
DelayQueue:延时电影院
// 电影到点才能入场
DelayQueue<MovieScreening> schedule = new DelayQueue<>();
SynchronousQueue:一对一传球游戏
// 不存储元素,每个put必须等待一个take
SynchronousQueue<String> ballChannel = new SynchronousQueue<>(true);
3.3 实战示例:生产者-消费者模式
/**
* 生产者-消费者模式的完美实现
* 像工厂的装配流水线
*/
public class ProducerConsumerPattern {
private final BlockingQueue<Item> assemblyLine;
public ProducerConsumerPattern(int lineCapacity) {
this.assemblyLine = new ArrayBlockingQueue<>(lineCapacity);
}
// 生产者:原材料入库
public void startProducers(int count) {
for (int i = 0; i < count; i++) {
new Thread(() -> {
while (true) {
Item item = produceItem();
assemblyLine.put(item); // 流水线满时等待
}
}).start();
}
}
// 消费者:产品出库
public void startConsumers(int count) {
for (int i = 0; i < count; i++) {
new Thread(() -> {
while (true) {
Item item = assemblyLine.take(); // 流水线空时等待
consumeItem(item);
}
}).start();
}
}
}
4. Fork/Join框架:团队协作的智慧
4.1 分而治之的哲学
Fork/Join框架的核心理念:大事化小,小事并行,结果汇总
就像编写一本巨著:
传统方式:一个人从头写到尾
Fork/Join方式:分给多个作者同时写不同章节,最后汇总
4.2 工作窃取算法:聪明的互助团队
public class TeamWorkExample {
public void workStealingInAction() {
// 初始:4个工人,每人25个任务
// 工人A先完成自己的任务
// 工人B还有10个任务没完成
// 工作窃取:工人A从工人B的任务列表"偷"任务帮忙
// 结果:整体效率最大化,没有人闲着
}
}
4.3 实战示例:并行数组求和
/**
* 使用Fork/Join并行计算数组和
* 像团队协作完成大项目
*/
public class ParallelArraySum {
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000; // 阈值
private final long[] array;
private final int start, end;
public SumTask(long[] array, int start, int end) {
this.array = array; this.start = start; this.end = end;
}
@Override
protected Long compute() {
// 如果任务足够小,直接计算
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) sum += array[i];
return sum;
}
// 拆分成两个子任务
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// 并行执行:一个fork,一个当前线程执行
leftTask.fork();
long rightResult = rightTask.compute();
long leftResult = leftTask.join();
return leftResult + rightResult;
}
}
public static void main(String[] args) {
long[] array = new long[1000000];
Arrays.fill(array, 1L); // 100万个1
ForkJoinPool pool = new ForkJoinPool();
long result = pool.invoke(new SumTask(array, 0, array.length));
System.out.println("计算结果: " + result); // 输出: 1000000
}
}
5. 性能对比与选择指南
5.1 不同场景的工具选择
使用场景 推荐工具 理由
高并发缓存 ConcurrentHashMap 分段锁,读多写少优化
任务队列 ConcurrentLinkedQueue 无锁,高吞吐量
资源池管理 LinkedBlockingQueue 阻塞操作,流量控制
优先级处理 PriorityBlockingQueue 按优先级排序
延时任务 DelayQueue 支持延时执行
直接传递 SynchronousQueue 零存储,直接传递
并行计算 Fork/Join框架 分治算法,工作窃取
5.2 性能优化要点
public class PerformanceTips {
public void optimizationGuidelines() {
// 1. 合理设置容量:避免频繁扩容或内存浪费
// 2. 选择合适的队列:根据业务特性选择
// 3. 避免过度同步:能用无锁就不用有锁
// 4. 注意异常处理:并发环境下的异常传播
// 5. 监控资源使用:避免内存泄漏和资源耗尽
}