文章目录
- 阻塞队列特点
- 生产者消费者模型
- 生产者消费者模型的意义
- 多线程环境使用阻塞队列
- 自己实现一个简单的阻塞队列
- 1. 先创建一个基本的阻塞队列类型
- 2. 考虑线程安全问题
- 3. wait 和 notify 实现阻塞
- 4.if 改为while
- 5.最终阻塞队列
- 6. 测试类
阻塞队列特点
- 阻塞队列是线程安全的
- 阻塞队列要有阻塞功能
- 当队列为空时,出队列就会出现阻塞,阻塞到其他线程入队列为止.
- 当队列为满时,入队列就会出现阻塞,阻塞到其他线程出队列为止.
生产者消费者模型
为了更好理解阻塞队列的作用,介绍一下生产者消费者模型,
大娘擀饺子皮给老王和老李包饺子,擀饺子皮就是生产者模型,包饺子就是消费者模型中间的桌子相当于阻塞队列.
- 若擀饺子皮的大娘速度很快,那么桌子很快就放不下了,那大娘就得停下来,等老李或老王包饺子再擀饺子皮.
- 若老王和老李包饺子的速度非常快,桌子空了,那老王和老李就得停下来,等大娘擀饺子皮再包饺子.
生产者消费者模型的意义
- 解耦合通过阻塞队列降低耦合度
- 削峰填谷由阻塞队列来承担数据峰值等情况.
用户通过前端页面向后端服务器A发送请求,服务器A通过阻塞队列分配到相应的服务器B或服务器C当中进行处理.
假设没有阻塞队列,服务器A直接连通B或C.当服务器B在某种情况工作不过来了,就会对服务器A甚至其他服务器造成重大影响,所以解耦合是极其重要的.
有了阻塞队列,就算是加入了新的服务器D连接A也可以几乎不改变服务器A的代码.
当遇上高峰期时,数据存储在阻塞队列中,由阻塞队列抗压
多线程环境使用阻塞队列
阻塞队列BlockingQueue是一个接口, 由以下几个类实现.
| 类型 | 描述 |
|---|---|
ArrayBlockingQueue | 基于数组实现的阻塞队列 |
LinkedBlockingQueue | 基于链表实现的阻塞队列 |
PriorityBlockingQueue | 基于堆实现的带优先级的阻塞队列 |
TransferQueue | 最多只包含一个元素的阻塞队列 |
阻塞队列的使用并不复杂:
- 实例化对象 ,传入参数为初始容量大小
BlockingQueue<Integer>blockingQueue=newArrayBlockingQueue<>(100);2.入队列和出队列
BlockingQueue的入队和出队为put()和take();具有阻塞特性put()没有返回值take()返回出队列的数据
Queue的入队列和出队列是offer()和poll();BlockingQueue也有offer()和poll();但是,offer()和poll()是没有阻塞特性的!
自己实现一个简单的阻塞队列
1. 先创建一个基本的阻塞队列类型
classMyBlockingQueue<E>{//阻塞队列泛型类privateE[]elems=null;privateinthead=0;//队头privateinttail=0;//队尾privateintsize=0;//数据量publicMyBlockingQueue(intcapacity){//构造方法elems=(E[])newObject[capacity];}publicvoidput(Eelem)throwsInterruptedException{if(size>=elems.length){// 队列满了.// 后续需要让这个代码能够阻塞.}// 新的元素要放到 tail 指向的位置上elems[tail]=elem;tail++;if(tail>=elems.length){tail=0;}size++;}publicEtake()throwsInterruptedException{Eelem=null;if(size==0){// 队列空了.// 后续也需要让这个代码阻塞}// 取出 head 位置的元素并返回elem=elems[head];head++;if(head>=elems.length){head=0;}size--;returnelem;}}2. 考虑线程安全问题
* 入队和出队都对数据进行更改,需要加锁classMyBlockingQueue<E>{//阻塞队列泛型类privateE[]elems=null;privateinthead=0;//队头privateinttail=0;//队尾privateintsize=0;//数据量// 准备锁对象, 如果使用 this 也可以.privatefinalObjectlocker=newObject();publicMyBlockingQueue(intcapacity){//构造方法elems=(E[])newObject[capacity];}publicvoidput(Eelem)throwsInterruptedException{if(size>=elems.length){// 队列满了.// 后续需要让这个代码能够阻塞.}synchronize(locker){// 新的元素要放到 tail 指向的位置上elems[tail]=elem;tail++;if(tail>=elems.length){tail=0;}size++;}}publicEtake()throwsInterruptedException{Eelem=null;if(size==0){// 队列空了.// 后续也需要让这个代码阻塞}synchronize(locker){// 取出 head 位置的元素并返回elem=elems[head];head++;if(head>=elems.length){head=0;}size--;}returnelem;}}if判断句中如果出现两个put在多线程刚好差一个满的情况也会出现线程安全问题.
所以,if也需要加上锁
classMyBlockingQueue<E>{privateE[]elems=null;privateinthead=0;privateinttail=0;privateintsize=0;// 准备锁对象, 如果使用 this 也可以.privatefinalObjectlocker=newObject();publicMyBlockingQueue(intcapacity){elems=(E[])newObject[capacity];}publicvoidput(Eelem)throwsInterruptedException{// 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.synchronized(locker){if(size>=elems.length){// 队列满了.// 后续需要让这个代码能够阻塞.}// 新的元素要放到 tail 指向的位置上elems[tail]=elem;tail++;if(tail>=elems.length){tail=0;}size++;// 入队列成功之后唤醒locker.notify();}}publicEtake()throwsInterruptedException{Eelem=null;synchronized(locker){if(size==0){// 队列空了.// 后续也需要让这个代码阻塞}// 取出 head 位置的元素并返回elem=elems[head];head++;if(head>=elems.length){head=0;}// 这个代码不要遗漏.size--;}returnelem;}}3. wait 和 notify 实现阻塞
- 当队列满时,
put()进入阻塞wait();当执行take()之后,通过notify()唤醒put() - 当队列空时,
take()进入阻塞wait();当执行put()之后,通过notify()唤醒take() - 如果没有线程处于wait阻塞,调用notify就不会产生任何结果
classMyBlockingQueue<E>{privateE[]elems=null;privateinthead=0;privateinttail=0;privateintsize=0;// 准备锁对象, 如果使用 this 也可以.privatefinalObjectlocker=newObject();publicMyBlockingQueue(intcapacity){elems=(E[])newObject[capacity];}publicvoidput(Eelem)throwsInterruptedException{// 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.synchronized(locker){if(size>=elems.length){// 队列满了.// 后续需要让这个代码能够阻塞.locker.wait();}// 新的元素要放到 tail 指向的位置上elems[tail]=elem;tail++;if(tail>=elems.length){tail=0;}size++;// 入队列成功之后唤醒locker.notify();}}publicEtake()throwsInterruptedException{Eelem=null;synchronized(locker){if(size==0){// 队列空了.// 后续也需要让这个代码阻塞locker.wait();}// 取出 head 位置的元素并返回elem=elems[head];head++;if(head>=elems.length){head=0;}// 这个代码不要遗漏.size--;// 元素出队列成功之后, 加上唤醒locker.notify();}returnelem;}}4.if 改为while
此时还有问题:
对此改进代码if更改成while,使wait能够多次判断,再次进入阻塞.
5.最终阻塞队列
classMyBlockingQueue<E>{privateE[]elems=null;privateinthead=0;privateinttail=0;privateintsize=0;// 准备锁对象, 如果使用 this 也可以.privatefinalObjectlocker=newObject();publicMyBlockingQueue(intcapacity){elems=(E[])newObject[capacity];}publicvoidput(Eelem)throwsInterruptedException{// 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.synchronized(locker){while(size>=elems.length){// 队列满了.// 后续需要让这个代码能够阻塞.locker.wait();}// 新的元素要放到 tail 指向的位置上elems[tail]=elem;tail++;if(tail>=elems.length){tail=0;}size++;// 入队列成功之后唤醒locker.notify();}}publicEtake()throwsInterruptedException{Eelem=null;synchronized(locker){while(size==0){// 队列空了.// 后续也需要让这个代码阻塞locker.wait();}// 取出 head 位置的元素并返回elem=elems[head];head++;if(head>=elems.length){head=0;}// 这个代码不要遗漏.size--;// 元素出队列成功之后, 加上唤醒locker.notify();}returnelem;}}6. 测试类
publicclassThreadDemo29{publicstaticvoidmain(String[]args)throwsInterruptedException{MyBlockingQueue<String>queue=newMyBlockingQueue<>(1000);// 生产者Threadt1=newThread(()->{intn=1;while(true){try{queue.put(n+"");System.out.println("生产元素 "+n);n++;}catch(InterruptedExceptione){thrownewRuntimeException(e);}}});// 消费者Threadt2=newThread(()->{while(true){try{Stringn=queue.take();System.out.println("消费元素 "+n);Thread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}}});t1.start();t2.start();}}