控制并发流程,做好线程间的协调

news/2024/5/10 23:26:48

一、概述

1. 什么是控制并发流程?

线程一般是由线程调度器自动控制的,但有些场景需要按照我们程序员的意愿去实现多线程之间相互配合,从而满足业务逻辑。比如:

  • 让线程A等待线程B执行完后再执行等一些相互合作的逻辑;
  • 或一系列线程等待一个线程运行完毕或发出信号之后再执行

2. 控制并发流程工具类

在这里插入图片描述

二、 CountDownLatch 倒计时门栓

倒数(向下计数、倒着计数)count为0后,那些执行了 await() 方法陷入阻塞的线程就被唤醒继续执行。就像去做过山车时,等到空余座位为0时,就会发车。

1. 主要方法介绍

  • CountDownLatch(int count):仅有这一个构造函数,参数count为需要倒数的数值。
  • await(): 调用 await() 方法的线程会被挂起,它会等待直到count值为0才继续执行。
  • countDown() :将count值减1,等到为0时,那些等待的线程会被唤起。

2. 图解

  • 在构造方法中指定倒数count值;
  • 调用await的线程Ta会被挂起;
  • 每调用countDown(),倒数count会减1;但是该线程不会被挂起,依然执行
  • 当倒数count值为0时,之前执行await的线程就被唤醒,开始执行

3、代码演示

(1)用法①:一等多

一个线程等待多个线程都发出信号后,再继续自己的工作:

/***      工厂中,质检,5个工人检查,当5个人都认为通过,才认为这个质检通过*/
public class CountDownLatchDemo1 {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(5);//指定倒数count为5ExecutorService pool = Executors.newFixedThreadPool(5);//线程池创建5个线程//5次任务质检for (int i = 0; i < 5; i++) {int no = i+1;Runnable runnable = new Runnable() {@Overridepublic void run() {try {Thread.sleep((long) (Math.random() * 10000));System.out.println("No." + no + ":质检完毕");} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();//倒计count减1}}};pool.submit(runnable);}System.out.println("等待5个人质检完。。。。。");countDownLatch.await();//主线程等待倒时count=0,释放所有挂起线程,并主线进行工作System.out.println("质检完成");}}

image-20230615103516182

(2) 用法②:多等一

多个线程等待某个线程发出信号后,同时开始执行

/***      模拟100m跑步,5名选手都准备好了,只能裁判员一生令下,5人同时跑出*/
public class CountDownLatchDemo2 {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);ExecutorService pool = Executors.newFixedThreadPool(5);//线程池创建5个线程for (int i = 0; i < 5; i++) {int no = i+1;Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println("NO." + no + ":准备完毕,等待发令");try {countDownLatch.await();System.out.println("No." + no + ":开始跑步");} catch (InterruptedException e) {e.printStackTrace();}}};pool.submit(runnable);}//主线程,模拟裁判员Thread.sleep(5000);System.out.println("发令枪响,比赛开始");countDownLatch.countDown();}
}

(3)综合用法:多等一 & 一等多

多等一:5名运动员等待裁判员打枪开跑;一等多:裁判员等5名运动员到达终点;

/***      模拟100m跑步,5名选手都准备好了,只能裁判员一生令下,5人同时跑出,当所有人都到终点后,比赛结束*/
public class CountDownLatchDemo3 {public static void main(String[] args) throws InterruptedException {CountDownLatch begin = new CountDownLatch(1);CountDownLatch end = new CountDownLatch(5);ExecutorService pool = Executors.newFixedThreadPool(5);//线程池创建5个线程for (int i = 0; i < 5; i++) {int no = i+1;Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println("NO." + no + ":准备完毕,等待发令");try {begin.await();System.out.println("No." + no + ":开始跑步");Thread.sleep((long) (Math.random()*10000));System.out.println("No." + no + ":到达终点");} catch (InterruptedException e) {e.printStackTrace();}finally {end.countDown();}}};pool.submit(runnable);}//主线程,模拟裁判员Thread.sleep(5000);System.out.println("发令枪响,比赛开始");begin.countDown();end.await();System.out.println("比赛结束!");}
}

4、注意点

  • 他不能重复使用,当倒数count=0,该实例就失效了;
  • 如果要再次使用,需要再实例化新的对象
  • 可以实现多等多的情况

三、Semaphore 信号量

1. 作用

对于一些重量级服务,如执行时间长、处理消耗资源大,设置一下同时并发执行任务的线程个数,从而保障服务平稳运行。

  • 用来限制或管理数量的有限资源的使用情况
  • 类似于生活中的"许可证",许可证数量有限,并且只有拿到“许可证”的线程才允许运行

2. 图解

3. 重要方法

  • new Semaphore(int permits,boolean fair):初始化Semaphore并指定许可证的数量。这里可以设置是否使用公平策略,如果传入true,那么Semaphore会把之前等待的线程放到FIFO的队列里,以便于当有了新的许可证可以分发给之前等了最长时间的线程;
  • tryAcquire() :看看现在有没有空闲的许可证,如果有的话就获取,如果没有的话也没关系,我不会陷入阻塞,我可以去做别的事,过一会再来查看许可证的空闲情况。
  • tryAcquire(timeout): 和tryAcquire() 一样,但是多了一个超时时间,比如“在3秒内获取不到许可证,我就去做别的事”
  • acquire():获取许可证,可响应中断
  • acquireUninterruptibly():获取许可证,拒绝响应中断
  • release():释放许可证

4、代码演示

(1)一般用法:

public class SemaphoreDemo {static Semaphore semaphore = new Semaphore(3,true);public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(50);for (int i = 0; i < 100; i++) {pool.submit(new Task());}pool.shutdown();}static class Task implements Runnable{@Overridepublic void run() {try {semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+":拿到许可证");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+":释放许可证");semaphore.release();}}
}

image-20230615110440704

(2) 特殊用法

一次性获取或释放多个许可证。

public class SemaphoreDemo {static Semaphore semaphore = new Semaphore(5, true);public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(50);for (int i = 0; i < 100; i++) {service.submit(new Task());}service.shutdown();}static class Task implements Runnable {@Overridepublic void run() {try {semaphore.acquire(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "拿到了许可证");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "释放了许可证");semaphore.release(3);}}
}

什么时候需要一次性获取多个许可证:

比如 TaskA 会调用很消耗资源的 method1(),而 TaskB 调用的是不太消耗资源的 method2(),假设我们一共有5个许可证。那么我们就可以要求 TaskA 获取5个许可证才能执行,而 TaskB 只需要获取到一个许可证就能执行,这样就避免了A和B同时运行的情况,我们可以根据自己的需求,通过分配许可证的方式合理分配资源。

5、注意点

  • 获取&释放的许可证数量要求必须一致,否则程序运行到最后,许可证都会被占用,就会都陷入阻塞
  • 根据情况设置公平性,一般设置为true,这样可以避免线程饥饿
  • 释放和获取对线程没有要求,可以由这个线程A获取,别的线程B释放
  • 可以将它实现成一个轻量级的CountDownLatch,比如信号量Semaphore的许可证数量为1,线程A获取到,线程B执行 acquire() 再获取时,就会陷入阻塞,线程A 执行release() 之后,线程B才能执行,相当于 CountDownLatch 的唤醒

四、Condition条件对象

1. 作用

当线程1需要等待某个条件的时候,它就去执行condition.await0方法,一旦执行了await0方法,线程就会进入阻塞状态。

然后通常会有另外一个线程,假设是线程2,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行condition.signal() 方法,这时JVM就会从被阻塞的线程里,找到那些等待该condition的线程,这时线程1就会收到可执行信号,它的线程状态就会变成Runnable可执行状态

2. 图解

3、signalAll() 和 signal() 的区别

  • signalAll()唤醒所有等待的线程,signal()唤醒一个
  • signal()是公平的,会唤醒等待时间最长的线程

4、代码演示

(1)基本用法

condition一般是绑定在锁lock上面的,基本用法如下:

public class ConditionDemo1 {private ReentrantLock lock = new ReentrantLock();private Condition condtion = lock.newCondition();void method1() throws InterruptedException {lock.lock();try{System.out.println("条件不满足,开始wait");condtion.await();System.out.println("条件满足,开始执行后续的任务");}finally {lock.unlock();}}void method2(){lock.lock();try{System.out.println("准备工作完成,开始唤醒其他线程");condtion.signal();}finally {lock.unlock();}}//主函数public static void main(String[] args) throws InterruptedException {ConditionDemo1 demo1 = new ConditionDemo1();//主线程创建一个线程1new Thread(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000);demo1.method2();//线程1,1秒后,唤醒主线程} catch (InterruptedException e) {e.printStackTrace();}}}).start();demo1.method1();//主线程阻塞}}

image-20230615112138646

(2)用Condition实现生产者消费者模式

/***      演示Condition实现生产者消费者模式*/
public class ConditionDemo2 {private static int queueSize = 10;private static PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);private static ReentrantLock lock = new ReentrantLock();private static Condition noFull = lock.newCondition();private static Condition notEmpty = lock.newCondition();//消费者static class Consumer extends Thread {@Overridepublic void run() {comsume();}//消费操作void comsume(){while (true){lock.lock();try {while (queue.size()==0){System.out.println("队列空,等待数据");notEmpty.await();}queue.poll();noFull.signalAll();System.out.println("从队列里取走了一个数据,队列还剩余空间"+(queueSize-queue.size())+"个元素");} catch (InterruptedException e) {e.printStackTrace();}}}}//生产者static class Producer extends Thread {@Overridepublic void run() {produce();}//生产操作void produce(){while (true){lock.lock();try {while (queue.size()==queueSize){System.out.println("队列满,等待消费");noFull.await();}queue.offer(1);notEmpty.signalAll();System.out.println("给队列生成了一个数据,队列有"+queue.size()+"个元素");} catch (InterruptedException e) {e.printStackTrace();}}}}//主函数public static void main(String[] args) {Consumer consumer = new Consumer();Producer producer = new Producer();consumer.start();producer.start();}}

5. 注意点

  • 实际上,如果说Lock可以用来代替synchronized,那么Condition就可以用来代替相对应的Obiect.wait/notify的,所以Condition在用法和性质上,几乎和Obiect.wait/notify都一样
  • await方法执行后会自动释放持有的Lock锁,和Object.wait一样,不需要自己手动先释放锁
  • 调用await的时候,必须持有锁,否则会抛出异常,和Object.wait一样

五、CyclicBarrier循环栅栏

1. 作用

  • CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程
  • 当有大量线程相互配合,分别计算不同任务,并且需要最后统汇总的时候,我们可以使用CyclicBarrier。CvclicBarrier可以构造一个集结点,每一个线程执行完毕后,都会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务
  • 就像我们在生活中聚会时,首先约定咱们3个人明天中午在学校碰面,都到齐后再一起继续接下来的安排。
  • CyclicBarrier循环栅栏是可以重复使用的,这一点和 CountDownLatch 不一样

2、代码演示

public class CyclicBarrierDemo {public static void main(String[] args) {//参数1:设置几个等待数//参数2:当线程数满足条件后,执行的任务CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {@Overridepublic void run() {System.out.println("五个到齐,走一波");System.out.println(Thread.currentThread().getName());}});//创建10个线程for (int i = 0; i < 10; i++) {new Thread(new Task(i,cyclicBarrier)).start();}}static class Task implements Runnable{private int id;private CyclicBarrier cyclicBarrier;public Task(int id, CyclicBarrier cyclicBarrier) {this.id = id;this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {System.out.println("线程"+id+",现在前往集合地点");try {Thread.sleep((long) (Math.random()*10000));System.out.println("线程"+id+":达到集合地点,开始等待其他人到达");cyclicBarrier.await();//陷入等待System.out.println("线程"+id+":出发了!!!");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}}}

每当凑齐五个之后,就出发一波,cyclicBarrier可以被重复使用。

3、CountDownLatch & CyclicBarrier 的区别

  • 作用不同

    • CountDownLatch对应的是事件,完成某个事件就可以调用countDown(),一个线程可执行多次countDown(),且该线程不会阻塞
    • CyclicBarrier对应的是线程,每个线程都执行await(),线程执行完await()会阻塞,达到指定的数量才会继续运行,
  • 可重用性不同

    • CountDownLatch:倒数count到0之后,该实例就不能再使用
    • CyclicBarrier:满足指定的数量条件后,就继续执行,且可再次使用
  • 结束后统一工作

    • CountDownLatch:结束后就只是唤醒线程继续工作
    • CyclicBarrier:可以在CyclicBarrier的构造函数中自定义 runnable 任务,结束后会执行该任务

点我扫码关注微信公众号

文章来源:控制并发流程,做好线程间的协调


个人微信:CaiBaoDeCai

微信公众号名称:Java知者

微信公众号 ID: JavaZhiZhe

谢谢关注!


http://wed.xjx100/news/301234.html

相关文章

MP : Human Motion 人体运动的MLP方法

Back to MLP: A Simple Baseline for Human Motion Prediction conda install -c conda-forge easydict 简介 papercodehttps://arxiv.org/abs/2207.01567v2https://github.com/dulucas/siMLPe Back to MLP是一个仅使用MLP的新baseline,效果SOTA。本文解决了人类运动预测的问…

通过构造方法使属性初始化

1 问题 如何使属性初始化。 2 方法 在Student类中定义两个构造方法publicStudent(String name)和public Student(String name,int score)。在使用new运算符创建对象&#xff0c;由于实际参数是一个String类型的数据"林冲"&#xff0c;因此在实例化时会调用有一个Stri…

使用CloudOS帮助企业落地云原生PaaS平台

PaaS究竟是什么&#xff1f; IaaS、SaaS的定义很清楚&#xff0c;而PaaS的定义就比较宽泛。所以&#xff0c;很多人把PaaS当做一个万能的“框”&#xff0c;什么都往里装&#xff0c;特别像一排垃圾桶中的那个“其他垃圾”桶&#xff0c;当你拎了一袋垃圾&#xff0c;不知道往…

Vue中如何进行颜色选择与调色板

Vue中如何进行颜色选择与调色板 颜色选择和调色板是Web开发中常用的功能&#xff0c;它们可以帮助用户选择或调整颜色。Vue作为一个流行的JavaScript框架&#xff0c;提供了一些工具和库&#xff0c;可以方便地实现颜色选择和调色板功能。本文将介绍如何在Vue中进行颜色选择和…

5.3.2 因特网的路由协议(二)基于距离向量算法的RIP协议

5.3.2 因特网的路由协议&#xff08;二&#xff09;基于距离向量算法的RIP协议 一、RIP协议概念 RIP是Routing Information Protocol缩写&#xff0c;又称为路由信息协议&#xff0c;是最先得到应用的内部网关协议&#xff0c;RIP作为一个常在小型互联网中使用的路由信息协议…

C++11学习笔记(3)——通用工具(上)(包含重要特性智能指针Smart pointer)

1.Pair 在C11中&#xff0c;std::pair是一个模板类&#xff0c;用于将两个值组合成一个单元。它可以将两个不同的类型的值配对在一起&#xff0c;并且提供了对这对值的访问和操作。 std::pair的定义 template<class T1, class T2> struct pair{T1 first;T2 second; };…

基于DDD实现的用户注册流程,很优雅!

欢迎回来&#xff0c;我是飘渺。今天继续更新DDD&微服务的系列文章。 在前面的文章中&#xff0c;我们深入探讨了DDD的核心概念。我理解&#xff0c;对于初次接触这些概念的你来说&#xff0c;可能难以一次性完全记住。但别担心&#xff0c;学习DDD并不仅仅是理论的理解&am…

工商业储能解读

工商业储能解读 0、前言1、2022-2023年工商业储能相关利好政策1.1 2022年1月4日1.2 2022年1月18日1.3 2022年2月10日1.4 2022年3月21日1.5 2022年3月22日1.6 2022年3月29日1.7 2022年4月2日1.8 2022年4月13日1.9 2022年4月25日1.10 2022年5月25日1.11 2022年5月30日1.12 2022年…