news 2026/2/10 11:18:05

面试官:你的项目哪里用到了 Disruptor?

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
面试官:你的项目哪里用到了 Disruptor?

Disruptor 是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 选择大奖)。

Disruptor 提供的功能类似于 Kafka 、RocketMQ 这类分布式队列,不过,其作为范围是 JVM(内存)。

Disruptor 解决了 JDK 内置线程安全队列的性能和内存安全问题。

JDK 中常见的线程安全的队列如下:

队列名字是否有界
ArrayBlockingQueue加锁(ReentrantLock有界
LinkedBlockingQueue加锁(ReentrantLock有界
LinkedTransferQueue无锁(CAS无界
ConcurrentLinkedQueue无锁(CAS无界

从上表中可以看出:这些队列要不就是加锁有界,要不就是无锁无界。而加锁的的队列势必会影响性能,无界的队列又存在内存溢出的风险。

因此,一般情况下,我们都是不建议使用 JDK 内置线程安全队列。

Disruptor 就不一样了!它在无锁的情况下还能保证队列有界,并且还是线程安全的。

1.广播场景

广播场景在我们的开发工作中并不少见,比如系统收到上游系统的一个请求消息,然后把这个消息发送给多个下游系统来处理。Disruptor 支持广播模式。比如消费者生产的消息由三个消费者来消费:

public class Broadcast { public static void main(String[] args) throws InterruptedException { int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1"); EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2"); EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3"); disruptor.handleEventsWith(consumer1, consumer2, consumer3); disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); Thread.sleep(1000); } } }

2.日志收集

再来看一个日志收集的例子。这里我们假设一个场景,业务系统集群有 3 个节点,每个节点打印的业务日志发送到 Disruptor,Disruptor 下游有 3 个消费者负责日志收集。

这里我们需要重新定义一个日志收集处理类,代码如下:

public class LogCollectHandler implements WorkHandler<LongEvent> { public LogCollectHandler(String consumer) { this.consumer = consumer; } private String consumer; @Override public void onEvent(LongEvent event) { System.out.println("consumer: " + consumer + ",Event: " + event); } }

下面这个代码是绑定消费者的代码:

public static void main(String[] args) throws InterruptedException { int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); WorkHandler<LongEvent> consumer1 = new LogCollectHandler("consumer1"); WorkHandler<LongEvent> consumer2 = new LogCollectHandler("consumer2"); WorkHandler<LongEvent> consumer3 = new LogCollectHandler("consumer3"); disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3); disruptor.start(); }

需要注意的是,上面使用的是 Disruptor 的handleEventsWithWorkerPool方法,使用的消费者不是EventHandler,而是WorkHandler。消费者组里面的消费者如果是WorkHandler,那消费者之间就是有竞争的,比如一个 Event 已经被 consumer1 消费过,那就不再会被其他消费者消费了。消费者组里面的消费者如果是EventHandler,那消费者之间是没有竞争的,所有消息都会消费。

3.责任链

责任链这种设计模式我们都比较熟悉了,同一个对象的处理有多个不同的逻辑,每个逻辑作为一个节点组成责任链,比如收到一条告警消息,处理节点分为:给开发人员发送邮件、给运维人员发送短信、给业务人员发送 OA 消息。

Disruptor 支持链式处理消息,看下面的示例代码:

public static void main(String[] args) throws InterruptedException { int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1"); EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2"); EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3"); disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3); disruptor.start(); }

Disruptor 也支持多个并行责任链,下图是 2 条责任链的场景:

图片

这里给出一个示例代码:

public static void main(String[] args) throws InterruptedException { int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1"); EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2"); EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3"); EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4"); EventHandler<LongEvent> consumer5 = new LongEventHandler("consumer5"); EventHandler<LongEvent> consumer6 = new LongEventHandler("consumer6"); disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3); disruptor.handleEventsWith(consumer4).then(consumer5).then(consumer6); disruptor.start(); }

4.多任务协作

一个经典的例子,我们在泡咖啡之前,需要烧水、洗被子、磨咖啡粉,这三个步骤可以并行,但是需要等着三步都完成之后,才可以泡咖啡。

当然,这个例子可以用 Java 中的 CompletableFuture 来实现,代码如下:

public static void main(String[] args){ ExecutorService executor = ...; CompletableFuture future1 = CompletableFuture.runAsync(() -> { try { washCup(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor); CompletableFuture future2 = CompletableFuture.runAsync(() -> { try { hotWater(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor); CompletableFuture future3 = CompletableFuture.runAsync(() -> { try { grindCoffee(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor); CompletableFuture.allOf(future1, future2, future3).thenAccept( r -> { System.out.println("泡咖啡"); } ); System.out.println("我是主线程"); }

同样,使用 Disruptor 也可以实现这个场景,看下面代码:

public static void main(String[] args) throws InterruptedException { int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1"); EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2"); EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3"); EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4"); disruptor.handleEventsWith(consumer1, consumer2, consumer3).then(consumer4); disruptor.start(); }

5.多消费者组

类比主流消息队列的场景,Disruptor 也可以实现多消费者组的场景,组间并行消费互不影响,组内消费者竞争消息,如下图:

示例代码如下:

public static void main(String[] args) throws InterruptedException { int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); WorkHandler<LongEvent> consumer1 = new LogWorkHandler("consumer1"); WorkHandler<LongEvent> consumer2 = new LogWorkHandler("consumer2"); WorkHandler<LongEvent> consumer3 = new LogWorkHandler("consumer3"); WorkHandler<LongEvent> consumer4 = new LogWorkHandler("consumer4"); WorkHandler<LongEvent> consumer5 = new LogWorkHandler("consumer5"); WorkHandler<LongEvent> consumer6 = new LogWorkHandler("consumer6"); disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3); disruptor.handleEventsWithWorkerPool(consumer4, consumer5, consumer6); disruptor.start(); }

6.总结

通过消费者的灵活组合,Disruptor 的使用场景非常丰富。本文介绍了 Disruptor 的 5 个典型使用场景。在选型的时候,除了使用场景,更多地要考虑到 Disruptor 作为高性能内存队列的这个特点。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/7 18:56:43

IDEA、Ncvicat 等 pojie 你懂的

Pojie 软件下载百度网盘下载&#xff1a; 链接: 通过网盘分享的文件&#xff1a; 链接: https://pan.baidu.com/s/1oPGXE0MjV4-grt--TSnIDw?pwd5201 提取码: 5201JDK 一键切换&#xff0c;其他的都可以一键pojie&#xff0c;无需额外操作。

作者头像 李华
网站建设 2026/2/7 4:39:30

2026专题直播第1期:如何快速搭建一个3D交互式场景?

2025年&#xff0c;我们连续做过几期专题直播&#xff0c;但后来因为其他工作耽误&#xff0c;就中断了较长一段时间。2026年&#xff0c;在时间允许的情况下&#xff0c;我们尽量多给你分享一些与GIS相关的行业专题直播。2026年第1期&#xff0c;我们联合上海漂视网络股份有限…

作者头像 李华
网站建设 2026/2/8 10:49:36

快看 !计算机专业包括哪些专业?计算机类18个专业全面解读,附就业方向,收藏这一篇就够了

计算机类专业介绍 在《普通高等学校本科专业目录&#xff08;2020年版&#xff09;》中&#xff0c;计算机专业是个大类&#xff0c;包括计算机科学与技术、软件工程、网络工程、信息安全、物联网工程、数字媒体技术****、智能科学与技术、空间信息与数字技术、电子与计算机工程…

作者头像 李华
网站建设 2026/2/7 18:15:33

又是跟AI学习的一天之带参数的装饰器

带参数的装饰器一般要写三层嵌套&#xff0c;示例# 第一层&#xff1a;接收装饰器参数 def require_permission(permission_codeNone, data_scope_checkFalse):# 第二层&#xff1a;接收被装饰的函数def decorator(fn):# 第三层&#xff1a;接收函数调用时的参数&#xff0c;并…

作者头像 李华
网站建设 2026/2/6 3:59:49

基于51单片机的智能散热风扇设计

摘要 基于51单片机的智能散热风扇的设计主要可以分为监测单元、控制单元和中央处理单元。本系统采用单片机STC89C52RC作为主控制器&#xff0c;采用数字型温度传感器DS18B20作为监测单元&#xff0c;主要就是采集温度数据&#xff0c;通过一系列的计算转化成实际温度数值。为了…

作者头像 李华
网站建设 2026/2/10 8:12:25

搬运机械手及其控制系统设计

第二章 总体方案确定 2.1 总体方案论证 机械手主要由执行机构、驱动系统、控制系统以及位置检测装置等所组成。 对气动机械手的基本要求是能快速、准确地拾一放和搬运物件&#xff0c;这就要求它们具有高精度、快速反应、一定的承载能力、足够的工作空间和灵活的自由度及在任意…

作者头像 李华