news 2026/4/25 12:55:10

多线程代码案例2-阻塞队列

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
多线程代码案例2-阻塞队列

文章目录

  • 阻塞队列特点
  • 生产者消费者模型
    • 生产者消费者模型的意义
  • 多线程环境使用阻塞队列
    • 自己实现一个简单的阻塞队列
      • 1. 先创建一个基本的阻塞队列类型
      • 2. 考虑线程安全问题
      • 3. wait 和 notify 实现阻塞
      • 4.if 改为while
      • 5.最终阻塞队列
      • 6. 测试类

阻塞队列特点

  1. 阻塞队列是线程安全
  2. 阻塞队列要有阻塞功能
    • 当队列为空时,出队列就会出现阻塞,阻塞到其他线程入队列为止.
    • 当队列为满时,入队列就会出现阻塞,阻塞到其他线程出队列为止.

生产者消费者模型

为了更好理解阻塞队列的作用,介绍一下生产者消费者模型,

大娘擀饺子皮给老王和老李包饺子,擀饺子皮就是生产者模型,包饺子就是消费者模型中间的桌子相当于阻塞队列.

  • 若擀饺子皮的大娘速度很快,那么桌子很快就放不下了,那大娘就得停下来,等老李或老王包饺子再擀饺子皮.
  • 若老王和老李包饺子的速度非常快,桌子空了,那老王和老李就得停下来,等大娘擀饺子皮再包饺子.

生产者消费者模型的意义

  1. 解耦合通过阻塞队列降低耦合度
  2. 削峰填谷由阻塞队列来承担数据峰值等情况.

  • 用户通过前端页面向后端服务器A发送请求,服务器A通过阻塞队列分配到相应的服务器B或服务器C当中进行处理.

  • 假设没有阻塞队列,服务器A直接连通B或C.当服务器B在某种情况工作不过来了,就会对服务器A甚至其他服务器造成重大影响,所以解耦合是极其重要的.

  • 有了阻塞队列,就算是加入了新的服务器D连接A也可以几乎不改变服务器A的代码.

  • 当遇上高峰期时,数据存储在阻塞队列中,由阻塞队列抗压

多线程环境使用阻塞队列

阻塞队列BlockingQueue是一个接口, 由以下几个类实现.

类型描述
ArrayBlockingQueue基于数组实现的阻塞队列
LinkedBlockingQueue基于链表实现的阻塞队列
PriorityBlockingQueue基于实现的带优先级的阻塞队列
TransferQueue最多只包含一个元素的阻塞队列

阻塞队列的使用并不复杂:

  1. 实例化对象 ,传入参数为初始容量大小
BlockingQueue<Integer>blockingQueue=newArrayBlockingQueue<>(100);

2.入队列和出队列

  • BlockingQueue的入队和出队为put()take();具有阻塞特性
    • put()没有返回值
    • take()返回出队列的数据
  • Queue的入队列和出队列是offer()poll();
  • BlockingQueue也有offer()poll();但是,offer()poll()没有阻塞特性的!

自己实现一个简单的阻塞队列

1. 先创建一个基本的阻塞队列类型

classMyBlockingQueue<E>{//阻塞队列泛型类privateE[]elems=null;privateinthead=0;//队头privateinttail=0;//队尾privateintsize=0;//数据量publicMyBlockingQueue(intcapacity){//构造方法elems=(E[])newObject[capacity];}publicvoidput(Eelem)throwsInterruptedException{if(size>=elems.length){// 队列满了.// 后续需要让这个代码能够阻塞.}// 新的元素要放到 tail 指向的位置上elems[tail]=elem;tail++;if(tail>=elems.length){tail=0;}size++;}publicEtake()throwsInterruptedException{Eelem=null;if(size==0){// 队列空了.// 后续也需要让这个代码阻塞}// 取出 head 位置的元素并返回elem=elems[head];head++;if(head>=elems.length){head=0;}size--;returnelem;}}

2. 考虑线程安全问题

* 入队和出队都对数据进行更改,需要加锁
classMyBlockingQueue<E>{//阻塞队列泛型类privateE[]elems=null;privateinthead=0;//队头privateinttail=0;//队尾privateintsize=0;//数据量// 准备锁对象, 如果使用 this 也可以.privatefinalObjectlocker=newObject();publicMyBlockingQueue(intcapacity){//构造方法elems=(E[])newObject[capacity];}publicvoidput(Eelem)throwsInterruptedException{if(size>=elems.length){// 队列满了.// 后续需要让这个代码能够阻塞.}synchronize(locker){// 新的元素要放到 tail 指向的位置上elems[tail]=elem;tail++;if(tail>=elems.length){tail=0;}size++;}}publicEtake()throwsInterruptedException{Eelem=null;if(size==0){// 队列空了.// 后续也需要让这个代码阻塞}synchronize(locker){// 取出 head 位置的元素并返回elem=elems[head];head++;if(head>=elems.length){head=0;}size--;}returnelem;}}
  • if判断句中如果出现两个put在多线程刚好差一个满的情况也会出现线程安全问题.

    所以,if也需要加上锁
classMyBlockingQueue<E>{privateE[]elems=null;privateinthead=0;privateinttail=0;privateintsize=0;// 准备锁对象, 如果使用 this 也可以.privatefinalObjectlocker=newObject();publicMyBlockingQueue(intcapacity){elems=(E[])newObject[capacity];}publicvoidput(Eelem)throwsInterruptedException{// 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.synchronized(locker){if(size>=elems.length){// 队列满了.// 后续需要让这个代码能够阻塞.}// 新的元素要放到 tail 指向的位置上elems[tail]=elem;tail++;if(tail>=elems.length){tail=0;}size++;// 入队列成功之后唤醒locker.notify();}}publicEtake()throwsInterruptedException{Eelem=null;synchronized(locker){if(size==0){// 队列空了.// 后续也需要让这个代码阻塞}// 取出 head 位置的元素并返回elem=elems[head];head++;if(head>=elems.length){head=0;}// 这个代码不要遗漏.size--;}returnelem;}}

3. wait 和 notify 实现阻塞

  • 当队列满时,put()进入阻塞wait();当执行take()之后,通过notify()唤醒put()
  • 当队列空时,take()进入阻塞wait();当执行put()之后,通过notify()唤醒take()
  • 如果没有线程处于wait阻塞,调用notify就不会产生任何结果
classMyBlockingQueue<E>{privateE[]elems=null;privateinthead=0;privateinttail=0;privateintsize=0;// 准备锁对象, 如果使用 this 也可以.privatefinalObjectlocker=newObject();publicMyBlockingQueue(intcapacity){elems=(E[])newObject[capacity];}publicvoidput(Eelem)throwsInterruptedException{// 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.synchronized(locker){if(size>=elems.length){// 队列满了.// 后续需要让这个代码能够阻塞.locker.wait();}// 新的元素要放到 tail 指向的位置上elems[tail]=elem;tail++;if(tail>=elems.length){tail=0;}size++;// 入队列成功之后唤醒locker.notify();}}publicEtake()throwsInterruptedException{Eelem=null;synchronized(locker){if(size==0){// 队列空了.// 后续也需要让这个代码阻塞locker.wait();}// 取出 head 位置的元素并返回elem=elems[head];head++;if(head>=elems.length){head=0;}// 这个代码不要遗漏.size--;// 元素出队列成功之后, 加上唤醒locker.notify();}returnelem;}}

4.if 改为while

此时还有问题:

对此改进代码if更改成while,使wait能够多次判断,再次进入阻塞.

5.最终阻塞队列

classMyBlockingQueue<E>{privateE[]elems=null;privateinthead=0;privateinttail=0;privateintsize=0;// 准备锁对象, 如果使用 this 也可以.privatefinalObjectlocker=newObject();publicMyBlockingQueue(intcapacity){elems=(E[])newObject[capacity];}publicvoidput(Eelem)throwsInterruptedException{// 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.synchronized(locker){while(size>=elems.length){// 队列满了.// 后续需要让这个代码能够阻塞.locker.wait();}// 新的元素要放到 tail 指向的位置上elems[tail]=elem;tail++;if(tail>=elems.length){tail=0;}size++;// 入队列成功之后唤醒locker.notify();}}publicEtake()throwsInterruptedException{Eelem=null;synchronized(locker){while(size==0){// 队列空了.// 后续也需要让这个代码阻塞locker.wait();}// 取出 head 位置的元素并返回elem=elems[head];head++;if(head>=elems.length){head=0;}// 这个代码不要遗漏.size--;// 元素出队列成功之后, 加上唤醒locker.notify();}returnelem;}}

6. 测试类

publicclassThreadDemo29{publicstaticvoidmain(String[]args)throwsInterruptedException{MyBlockingQueue<String>queue=newMyBlockingQueue<>(1000);// 生产者Threadt1=newThread(()->{intn=1;while(true){try{queue.put(n+"");System.out.println("生产元素 "+n);n++;}catch(InterruptedExceptione){thrownewRuntimeException(e);}}});// 消费者Threadt2=newThread(()->{while(true){try{Stringn=queue.take();System.out.println("消费元素 "+n);Thread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}}});t1.start();t2.start();}}
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/25 12:55:09

s2-pro音色复用实战:从客户录音中提取声纹用于营销外呼系统

s2-pro音色复用实战&#xff1a;从客户录音中提取声纹用于营销外呼系统 1. 场景需求分析 在电话营销领域&#xff0c;使用真实客户的声音进行外呼可以显著提升接通率和转化率。传统方式需要客户录制大量语音样本&#xff0c;不仅效率低下&#xff0c;而且难以保证音质一致性。…

作者头像 李华
网站建设 2026/4/25 12:55:04

ARM SME2指令集:矩阵运算与AI加速技术解析

1. ARM SME2指令集概述在移动计算和边缘AI领域&#xff0c;性能与能效的平衡一直是芯片设计的核心挑战。ARMv9架构引入的SME2&#xff08;Scalable Matrix Extension 2&#xff09;扩展&#xff0c;正是针对这一挑战的解决方案。作为SVE2&#xff08;Scalable Vector Extension…

作者头像 李华
网站建设 2026/4/25 12:55:01

MarkDownload:从网页到笔记,三步打造你的知识库

MarkDownload&#xff1a;从网页到笔记&#xff0c;三步打造你的知识库 【免费下载链接】markdownload A Firefox and Google Chrome extension to clip websites and download them into a readable markdown file. 项目地址: https://gitcode.com/gh_mirrors/ma/markdownlo…

作者头像 李华
网站建设 2026/4/25 12:52:49

WeDLM-7B-Base GPU部署:NVIDIA Triton推理服务器封装与批量请求优化

WeDLM-7B-Base GPU部署&#xff1a;NVIDIA Triton推理服务器封装与批量请求优化 1. 模型概述与核心优势 WeDLM-7B-Base是一款基于扩散机制&#xff08;Diffusion&#xff09;的高性能基座语言模型&#xff0c;拥有70亿参数规模。该模型在标准因果注意力机制下实现了并行掩码恢…

作者头像 李华