本文转载自微信公众号「无敌码农」,作者无敌码农。转载本文请联系无敌码农公众号。
大家好!我是”无敌码农”。今天的文章将给大家分享Java并发编程相关的知识点,虽然[ z = H u j *类似的文章已有很多,但本文将以更贴近实际使用场景的方式进行阐述。具体将对Java常见的并发编程方式和手段进行总结,以便可以从使用角度更好地感知Java并发编E o j Z .程带来的效果,从而为后续更深入的理解Java并发机制进行铺垫。
Java多线程概述
在Java中使用多线程是提高程序并发响应能力的重要手段,但同时它也是一把双刃剑;如果使用不当也很容易导致程序出错,并且还很难直观地找到问题。这` [ N T $ 7 .是因为:1)、线程运行本身是由操作系统调度,具有一定的随机性;2)、Java共J + G享内` V S 8存模型在多线程环境下很容易产生线程安全问题;3)、不合理的封装依赖,极容易导致发布对象的不经意逸出。
所以,q O R l g s ? / Q要用好多线程这把剑,就需要对Java内存模型、线程安全问题有较深的认识。但由于Java丰富的生态,在实际研发工作中,需要我们自己进行并发处理的场景大都被各类框架或Y I d Y m z ! 6组件给屏蔽了。这也是造成很多Java开发人员对并发编程意识淡薄的主要原因。
首先从Java} T f q h内存模型的角度理解下使用多线程编程最核心的问题,具体如下图所示:
如上图所示,在Java内存模型中,对于用户程序来说用得最频繁的就是堆内存和栈内存,其中堆内存主要存放对象及数组,例如由new()产生的实例。而栈内存则主要是存储运行方\ p ) x Y法时所需的局8 T @ & P U v E部变量、操作数及方法出口等信息。
其中堆内存是线程共享的,一个类被实例化后生成的对象、及对象中定义的成员变量可以被多个线程共享a ! \ o访问,这种共享主要体现在多个线程同时执行、同一个对象实例的某个方= m Q 1 W ^ . P法时,会将该方法中操作的对象成员变量分别以多个副本的方式拷贝到方法栈中进行操作,而不是c % . y / ) q直接修改堆内存中对象的成员变量值;线程操作完成后,会再次将修改后的变量值同步至堆内存中的主内存地址,并实现对其他线程的可见。
这个过程虽然看似行云流水,但在JVM中却至少需要6个原子步骤才能完成,具体如下图所示:
如上图所示,在不考虑对共享变量进行加锁\ Z – 8 Q ? :的情况下,堆内存中一个对象的成员变量被线程修改大概需要以下6个步骤:
1、rea# a E M n O x Y :d(读取):从堆内存中的读取要操作的变量;
2、load(载入):将读取的变量拷贝到线程栈内存;
3、use(使用):将栈内P E [ _ V _ |存中的变量值传e . & ) ^ , u J递给; J ! [ ~执行引擎;
4、assign(赋值):* j 1 \ 4 J v r将从执行引擎得到的结果赋值给栈内存中变量;
5、store(存储):将变更后的栈内存中的变量值传递到主内存;
6、write(写入):变更主内存中的变量值,此时新值对所有线程可见;
由此可f / s Y G 6 f K见,每个线程都可以按这几个步骤m u L W a { I并行操作同一个共享变量。可想而知,如果没有任何同步措施,那么在多线程l l N Y c环境下,该共享变量的值将变得飘忽不定,很难得到最终正确的结果。而这就是所谓的线程安全问题,也是我们在使用多线程编* P ?程时,最需要关注的问题!
线程池的使用
在实际场景中,多线程的使用并不是单打C a # 9 q Q a F E独斗r l 1 G b 9,线程作为宝贵的系统资源,其创建和销毁都需要耗费一定的系统资源;而无限制的创建线程资源,也会导致系统资源的耗尽。所以– 0 e E M * 9,为了重复使用线程资源、限制线程的创建行为,一般都会通过线程池来实现。以Java Web服务中使用最广i 8 & B ; } +的Tomcat服务器举例,为了并行处理网络请求就使用j V / S b g了线程池,源码示例如下:
- publicbooleanprocessSocket(SocketWrapperBa\ w 8se<S>socketWrapper,
- SocketEventevent,booleandispatch){
- try{
- if(socketWrapper==null){
- returnfalse;
- }
- SocketProcessorBase<S>sc=null;
- if(processorCache!=null){
- sc=processorCache.pop();
- }
- if(sc==null){
- s$ . _c=createSocket, h 6Processor(socketWrapper,L Q 9 Z jevent);
- }else{
- sc.reset(socketWrapper,eve7 g [ 9nt);
- }
- /I , m/这里通过线程池对线程执行进行管理
- Executorexecutor=getExecutor();
- if(dispatc9 = ` { y X { Qh&&execua ) 6 a w N ]tor6 ~ h f y V!=null){
- executor.execute(sc);
- }else{
- sc.rM = D C 1 a P vun();
- }
- }catch(RejectedExecutionExceptionree){
- getLd ] ? - i D . Z 4og().warn(si ^ Wm.getString("enl f @ = j vdpoint.exe; g Y { , 6 scutor.fail",socketWrapper),ree);
- returnfalse;
- }catch(Throwabls % 3 Z I { K %et){
- ExceptionUtils.han_ [ V ZdleThrq p P ^ } }owable(t);
- //ThismeanswegotanOOMorsimilarcreatingathread,orthat, l ` M
- //thepoolanditsqueueaS 5 + ) + v E prefull
- getLog().error(sm.getString("endpoint.process.fail"),t);
- returnfalse;
- }
- returntrue;
- }; C = 6 ) \
上述代码为Tomcat源码使用线程池并发处理网络请求的示例,这里以Tomcat为例,主要是因为基于Sprip $ q q l i z tng Boot、Spring MVC开发的Weu I 7b服务大都运行在Tomcat容器,而对于线程、线程池使用的复杂度都被屏蔽在中间件和框架中了,所以很多同学虽然写了不少Java代码,但在业务研发中I x C b额外使用h w N b % d G ! p线程的场景可能并不多,举这个例子的目的就是为了提升下并发编程的意识!
在Java中使用线程池的主要^ B a p方式是Executor框架,该框架作为JUC并发包的一部分,为Java程序提供了一个灵活v b R )的线程池实现。其逻辑层次如下图所示:
如图9 c |所示,使用Executor框架,既可以通过直接自定义配置、扩展Thre# 6 9 B % s + ]adPoolExecutor来创建一个线程池,也可以通过Executors类直接调用“newSing& g x 6 * V #leThreadExecutor()、newFixedThreadPool()、newCachedThreadPool()”这三个方法来创建具有一定功能特征的线程池。0 v t g e 1
除此之外,也可以通过自定义配置、扩展ScheduledThreadPoolExecutor9 e } ? ,来创建一个具有周期性、定时功能的线程池,例如线; Q T 1 B h程10s后运行、线程每分钟运行一次等。同样,与ThreadPoolExecutor一样,如果不想自定义配置,也可以通过o + k K 7Executors类直接调用“nW g v q SewScheduledThreadPool()、newSingleThreadScheduledExecutor()”这两个方法来分别创建具备自动线程规模扩展能力和线程池中只允许有单个线程的特定线程池。
而ForU % n Z t O s hkJoinPool是jdk1.8以后新增的一种线程池实现类型,类似于Fork-Join框架所支持的功能。这Q V S是一种可以将一个大任务拆分成多个任务队列,并具体分配给e G ^ $ b F H不同线程处理的机制,而关键的特性在于,通过窃取算法,某7 Z P R 9 i K Z +个线程在执行完本队列任务后,可以窃取其他队列的任务进行执行,O $ ! =从而最大限度提高线程的利用效率。
在实际应用中,虽然可以通过Executors方便的创建单个线程、固定线程或具备自动收缩能力的线程池,但一般还是建议直接通过ThreadPoolExecutor或v 8 W i Z RScheduledThreadPoolExecutor自定义v P q配置,这主要是因为Executors默认创建的线程池,很多采用的是无界队列,例如LinkedBlockingQueue,这样线程| 7 ^ t ; q 5 i就可以被无限制的添加都线程池的任务执行队列,如果请求量过大容易造成OOM。
接下来以一个实际的例子来演示通过ThrG ~ 2 _ ` AeadPoolExecutor如何自定义配置一个业务L V N线程池,具体如下:
1)、配置一个线程池类
- publicfinalclassSingleBlockE k % k + E :PoolExecutor{
- /**
- *自定义配置线程池(线程池核心线程数、$ y H z最大线程数、存活时间设置、采用的队列类型、线程工厂类、线程池拒绝处理类)
- */
- privatefinalThreadPoolExecutorpool=newThreadPoolExecutor(30,100,5,TimeUnit.MINUTES,
- newArrayBlockingQueue<Runnable>(100),newBloc= . ) ` ` )kThreadFactory(),newBlockReM , m ` 5 z i E @jectedExeH b s G X 4c= , v + ] Y O V EutionHandler());
- publicThreadP% 9 s \ 6 Y r uoolExecutorgetPool- R x t . 7 i g m(){
- returZ k Nnp\ 7 6 ? t ] ~ =ool;
- }
- privateSingleBlockPoolExecutor(){
- }
- /**
- *定义线程工厂
- */
- publicstaticclassBlockTh+ n I L dreadFactoryimplementsThreadFactory{
- privateAtomicIntegercount=newAtomicInteger(0);
- @OW M C } a Cverride
- publicThreadnewThread(Runnabler){
- ThreadZ q p ? 3t=newThread(r);
- StrinE S t ggthreadName=SingleBlockPoolExecutor.class.getSimpleName()+"-"+count.addAndGet(1);
- t.setName(threadName);
- returnt;
- }
- }
- /**
- *定义线程池拒绝机制处理类
- */
- publicstaticclassBlockRejectedExecutionHandlerimplementv s D q m S isRejectedExe~ O z :cutionHandler{
- @Override
- publicvoidrejecN 2 j n S H KtedExecution(Runnabler,ThreadPoolExecutorexecutor){
- try{
- //被拒线程再次返回阻塞队列进行等待处理
- executor.getQueue().pg s J V S &ut(r);
- }catch(InterruptedExceptione){
- ThreaJ ? O W U 6d.currentThread().interrupt();j x & [
- }
- }
- }
- /**
- *在静态内部类中持有单例类的实例,并且可直接被初始化
- */
- privatestaticclassHolder{
- privatestaticSingleBlockPoolExecutorinstance=newSingleBlockPoolExecutor();
- }~ 5 , e c 0 .
- /**
- *调用getInsB [ ! G ` &tance方法,A O , ( : ( T u x事实上是获得Hold} T M T v % Ger的instance静态属性L D L G o
- *
- *@return
- */
- publiu N [ } }cstaticSingleBlockPoolExecutorgetInstance(){
- returnHold] A h # $ z T p :er.instance;
- }
- /**
- *线程池q I 1 d销毁方法
- */
- publicvoiddestroy(){
- if(poolf ^ P q 9!=null){
- //线程池销毁
- pool.shutdownNow();\ O s
- }
- }
- }
如上述代码所示,通过单例模式配置了一个线程池。在对ThreaC ! P 1 D cdPoolExecutor的配置中,需要设置“核心E 3 | 0 N 2 { Z线程数、最大线程数、存活时间设置、采用的队列类型、线程工厂类、线程池拒绝处理类”,这几个核心参数。
2)、定义系统全局线程池管理类
- publicclar D Q y S 5 OssAsyncManager{
- /**
- *任务处理公共线程池
- *t ? 3 a X 8 R #/
- publicstaticfinalExecutorS2 ? 1 P ] a = ferviceservice=Sinx o & n cgleBlockPoolExecutor.getInstance().getPool();
- }
在应用中,除了框架定义的线程池外,如果自定义线程池,为了方便统M J i X 3 O 5 ( (一管理和使用,可以建立一个全局管理类,如上所示,该类通过静态变量的方式初始化了前面我n * L 2 ! 8 f们所定义的线程池。
3)、业务中使用
- @Serviceu b |
- @Slf4j
- publiccla) / , 7 KssOrderServiceIq \ ~ Mmplimplemenw ` A # - - ztsOrderService{
- @p 2 E a | [ ] tOverride
- publicCreateOrderBOcreatz 1 d j _ c DeOrder(CreateOrderDTOcreateOrd* j N 5 R 9 RerDTO){
- //1、同步处理核心业务逻辑
- log.info3 ` U Z ~ f a b("同步处理业务逻辑");
- //2/ d | * j 9 h、通过线程池提交,异步处理非核心逻辑,例如日志埋2 & c !点
- AsyncManager.service.execute(()->{
- System.out.println("线程->"+Thread.currentThread().getName()+",正在执行异步日志处理任务");
- });
- reJ 8 k |turnCreateOrderBO.builder().result(true).build();
- }
- }
如上代码所示,业务中需要通过线程池异步处理时,| 2 8 e可以通过线程池管理类获取对应的线程池,并向其提交执行! K h线程任务。
Futur4 5 } [ peTask实现异5 c t t 3 x步结果返回
在使用Thread或Runnable实现的线程处理中,一般是不能返回线程处理X X N Y ?结果的。但如果希望在调用线程异步处A S z } a R * B理完成后,能够获得线程异步处理的结果,那么就可以通过F( y wutureTask框架实现。示例代码如B ~ W f x @ G z下:
- @Service
- @Slf4j
- publicclassOrde) \ A $ q n 1 % NrServiceImplimplementsOrderService{
- @Override
- publicCreateOrderBOcreateOrder(CreateOrderDTOck e ~ Q L { b ?reateOrderDTO){
- //Future[ Q I ;异步处理返回执行结果
- //定义接收线程执行结果的FutureTask对象
- List<Future] : a G U<Integer>>results=Collections.synchronizedList(newArrayLi| s w @ Q n ,st<>());
- //实现CallabT | I Ale接口定义线程执行逻辑
- results.add(AsyncManagA 5 D m D N d uer.service.subm] / k F 4 nit(newCallablT H { ?e<Integer>(){
- @Override
- publicIntegercall()throwsException{
- inta=1,b=2;
- System.8 ? { R ? 6 9out.println("Callable接口执行中");
- returna+b;
- }
- }));
- //输出线程返回结果
- for(Future<Integer>future:results){
- try{
- //这里获取结果,等待时间设置200毫秒~ w 5 a $ k )
- System.out.println("a+b="+future.get(200,TimeUnit.MILLISECONDS));
- }catc[ n \ + 9 E ~ ~h(Interr. Q ) c 7 4 G % buptedExceptione){
- e.printStackTrace()\ l C $ w C } B :;
- }catch(ExecutionExceptione){
- e.printStackTrace();
- }catch(TimeoutExc) 8 weptione){
- e.printStackTrace();
- }
- }# s M
- //判断线程是否执行完毕,完毕则获取执行结果
- returnCreate( h ] + ]OrderBO.bui~ u - \ q ( I elder().resul$ 1 N ;t(true).build();_ [ R 2 N x , q $
- }
- }
如上述X @ 8 % .代码,如果希望线程返回执行结果,那么可以G A @ C A /通过实现Callable接口定义线程类,并通过Futur# i u w NeTask接收线程处理结果。不过在实际_ 5 Y q b M Z F使用时,需要注意线程暂时未执行完成情况下的业务处理逻辑。
CountDownLatch实现线程并行同步
在并发编程中| B s u .,一个复杂的业务逻辑可以通过多个线程并发执行来提高速度;但如果需要同步等待这些线程执行完后才能进行后续的& c l |逻辑,那么就可以通过CountDownLatch来实现对多个线程执行的同步汇聚。其逻辑示意图如下:
从原理上看CountDownLatch实际上j t S P P ^ P是在其内部创建并维护了一B : N ( 5 a {个volatile类型的整数计数器,当调用countDown()) ; o方法时,会尝试将整数计数器-1,当调用# ! 7 ^ )wait(; L e a : ? [)方法时,当前线程就会判断整数计数器是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入阻塞状态,直到某个线程将计数器设置为0,d 1 2 : x才会唤醒在aA E ( + rwait()方法中等待的线程继续执行。
常见的代码使用示例l ~ :如下:
1)、创建执行具体业务逻辑的线程处理类
- publicclassDataDeali K h m G q ] jTaskimpZ : $ W [ ) t B llementsRunnable{
- privateList<Integer>list;
- pri6 M C 6 ( J U AvateCountDownLatc@ & Bhlatch;
- publicDataDD : + | Z I Q E WealTask(List<Integer>list,CountDownLatchlatch){
- this.list=list;
- this.latch=latch;
- }
- @Override
- publicvoidrun(){
- try{
- System.out.println("线程->"+Thread.currentThread().getName()+",处理"+list.size()W - ? L u);
- }finally{
- //处理完计数器递减
- latch.countDown();
- }
- }
- }
该线程处理类,在实例化时接收除了待处理数据参数外,还会接收CountDownLatch对象,在执行完线程逻I t 1 F辑,注意,无论成功或失败,都需要调用countDown()方法。
2)、具体的使用方法
- @Servg T 5 u R Jice
- @Slf4j
- pubm ( Z #licclassOrderServiceImplimplementsOrdez c e & j O U v 3rService{
- @Override
- publicCreateOrderBOcreateOrder(CreateOrderDTOcreateOrderDTO){
- //CountDownLatch的使用示例
- //模拟待处理数据生成
- Integer[]array={10,20,30,40,50,60,70,80,90,100,101,102};
- List<Integer>l@ r 7 : ~ Fist=newArrayList<&) w 1 g : d y Kgt;();
- Arrays.asList(array).n ] Nstream().map(o->list.add(o)).collect(Collectors.5 W + / [ Z btoList());
- //对数据进行分组处理(5条记录为1组)
- Map_ Y b $ . K j (<String,List<?>&gl M ) q C q ` ht;entityMap=this.groupListByAvg(list,6);
- //根据数据分组数) ~ 2量,确定同步计数器的值
- CountDownLatchlatch=newCountDownL y B ^ R aLatchj 7 [ k = _ W G(entityMap.size* ; 3 d I g r C());
- IM ] { f *terator<Entry<String,List<?>>>it=entityMap.entrySet().iterator();
- try{
- //将分组~ P S p { 0 {数据分批提交给不同线程处理
- while(it.hasNext()){
- DataDealTaskdataDealTask=newDataDealTask((Li( ^ + O g X 2st<Integer&@ v F O _ s Ugt;)it.next().getValue(),latch);
- AsyncManager.service.submit(dataDealTask);
- }
- //等待分批处理线程处理完成
- latch.await();
- }catch(InterruptedExcepI q Z ) U O rt0 P c : I z jione){
- e.printStackTrace();
- }
- returnCreateOrderBu b E 5O.builder().result(true).build();
- }
- }
如上所示代码,在业务逻辑中\ L : k t D )如果处理数据量多,则可以通过分w F ( J + d \ o组的方式并行处理,而等待所有线程处理完成后,再同步返回调用方。这种场景就可以通过CountDownLatch来实现同步!
CycliBarrier栅栏实现线程阶段性同步
CountDownLatch的功能主要是实现线程的一次性同步。而在实际的业务场景中也可能存在这样的情况,执行一个阶段性的任务,例如”阶段1->阶段2->阶段3->阶段4->阶段5″。那么在并发处理这个阶段性任务时,就要在每个阶G k H % P段设置栅栏,只有当所有线程执行到某个阶段点之后,才能继续推进下一个阶段任务的执行,其逻辑如图所示:
针对上述场景,就可以通过CycliBay O \ o R n 8 C qrT U c ; m 2 P drier来实现。而从实U W – V ] P现上看,CyclicBarrz _ Q ( } Rier使用了基于ReentrantLock的互斥锁实现;在CyclicBarrier的内部有一个计数器 count,当count不为0时,每个线程在到达同步点会先调用await方法将自己阻塞,并将计数器会减1,直到计数器减为0的时候,所有因调用await方法而被阻塞的线程就会被唤^ n # J k (醒继续执行。并进入下一轮阻塞,此时; S m V 5 Y s h V在nj G # * |ew CyclicBarrier(parties) 时设置的parties值,会被赋值给 count 从而实现复用。
例如,计算某个部门的员S / $ w H K \工工资,要求在所有员工工资都计算完之后才能进行下一步整合操作。其代码示例如下:
- @Slf4j
- @Service
- publicclass. j p | - f m +SalaryStatisticServiceImplimplf ( _ ? ; X p xementsSalaryStatisticService{
- /**
- *模拟部门员工存储数据
- */
- pubb H | ElicstaticMap<String,List<EmployeeSalaryInfo>c U H 6 0 3 6>employeeMap=Collectiond 0 a % % d 1s.synchronizedMap(newHashMap<>());
- static{
- EmployeeSalaryInfoemployeeA=newEmployeeSalaryInfo();
- employeeA.setEmployeeNo("100");
- employeeA.setBaseSalaryAmount(14 Q ^0000);
- employeeA.setSubsidyAmount(3000);h \ x ^ } - & ,
- EmployeeSalaryInfoemployeeB=newEmployN u - !eeSalaryInfo()G / u r M w y q;
- employeeB.setEmployeeNo(H ^ q & + ? @ p"101");
- employeeB.setBaseSalaryAmount(30000);
- employeX k L 4 W J beB.setSubsidyAmount(3000);
- List<EmployeeSalaryInfo>list=newArrU C [ayList<>();
- list.add(employeeA);
- list.add(employeeB);
- employeeMap.put("10",list);
- }
- @Override
- publiy s [ w \cStatistid z m , kcReportBOstatisticReport(StatisticReportt * 4 E nDTOstatisticReportDTO){
- /x z z/查询部门下所有员工信息(模拟)
- List<EmployeeSalaryInfo>employeeSalaryInf# , | V ^ Vos=employeeMap.get(statisticReportDTO.getDepartmentNo());
- if(employeeSalaryInfos==null){
- log.info("部门员工信息不存在");
- returnStatisticReportBO.buil} A = 4 _ 5der().build();
- }
- //定义统计总工资的安全变量
- AtomicIntegertotalSalary=newAtomicInteger();
- //开启栅栏(在各线程触发之后触发i 3 A 2 h s e +)
- CyclicBarriercyclicBarrier=newCyclicBarrier(e( @ o ~ \ *mployeeSalaryInfos.size(),newRunnable(){
- //执行顺序-B1(随机)
- //该线程不会阻塞主线程
- @Override
- publicvoidrun(){
- log.info("汇总已分别计算出的两个员工的工资->"+totalSalary* : _ u ! B Y I.get()+",执行顺序->B");
- }
- });
- //执行顺序-A
- for(EmployeeSalaryInfz 8 #oe:em\ / H } h ( \ployeeSalaryInfos){
- AsyncManager.service.su@ \ @ N L y |bmit(newCallable<Integer>(){
- @Override
- publicIntegercall(){
- inttotalAmount=e0 3 y # K K + c d.getSubsidyAmount(8 L ! i X b \ m)+e.getBaseSalf 9 f ! ) a W :aryAmountj 8 c B n , t F();
- los | 1 y 9 H r D Ng.info("计算出员工{}",e.getEmployeeNo()+"的工资->"+totalA( c {mount+",执行顺序->A"b d E k x W $);
- //汇总总工资
- totF e n : / ? 6alSalary.addAndGA k b p c k ? % `et(totalAmount);
- try{
- //等待其他线程同步
- cyclicBarrier.await();
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }catch(BrokenBarrierExceptione){
- e.printStackTrace();
- }
- returntotalAmount;
- }
- });s : 0
- }
- //执行顺序-A/B(之前或之后随机,totalSalary值不能保L 6 } S u O ^ Z 9证一定会得到,所以CyclicBarrier更适合无返回的可重复并6 h , N ) 4行计算)
- //封装响应参数
- Statist[ ? q j V & WicRepe D Fort$ I y - 1 z 2 p VBOstatisticReportBO=StatisticRe[ 1 & @portBO.builder().employeeCount(employeeSalaryInfos.size())
- .departmentNo(statisticReportDTO.getDepartmentNo())
- .salaryTotalAmount(totalSalary.gQ = ; q - @et()f h E / P j x F).build();
- log.info("封装接口响应参数,执行顺序->A/B");
- returnstatistix 7 + t E } 8cReportBO;
- }
- @Data
- publicstaticclassEmployeeSalaryInfo{
- /**
- *员工编号
- */
- privateStringemployeeNo;
- /**
- *基本工资
- */
- privatJ % + q h P +eId % Z = O UntegerbaseSalaryAmount;
- /**
- *补助金额
- */
- privateIntegersubsidyAmount;
- }
- }
上述代码的执行结果如下:
- [kPoolExecutor-1]c.w.c.s.impl.SalaryStatisticServiceImpl:计算出员工100的工资->13000,执行顺序-
- [kPoolExecutor-2]c.w.c.s.impl.SalaryStatisticServiceImpl:计算出员工101的工资->33000,执行顺序-
- [kPoolExecutor-2]c.w.c.s.impl.SalaryStat: 4 4 jisticServiceImpl:汇总已分别计算出的} v Z G两个员工的工资->46000,
- [nio-8080-exec-2]c.w.c.sW d B S 6 B ).impl.Sal* w _ J \ NaryStatisticServiceImpl:封装接口响应参数,执行顺序->A/B
从0 p 8 R上述结果可以看出,受CycliBarrier控制的线程会等待其他线程执行完成后同步向后执行,并且CycliBarrier并不会阻塞主线程,所以最后响应参数封装代码可能在CycliBarrier汇总线程之前执行,也可能在其之后执行,使用时需要注意!
Semaphore(信号量)限制访问资源的线程数
Semaphore可以实o R } v现对某个共享资源访问线程数的限制,实现限流功能。以停车场| p m c B线程为例,代码如下:
- @Service
- @Slf4j
- publicclassParkServiceImplimplementsParkService{
- /**
- *模拟停车场的车位数
- */
- privatestaK 2 ; ) T ; H jticSemaphorei L K s 3 ( J Nsemaphore=newSemaphore(2);
- @Override
- publicAccessParkBOaccessPark(AccessParkDTOaccessParkDTO){
- Asyh 6 } N k & W hncManaM G ^ L M h ( Wger.service.execute(()->{
- if(seN 2 : u { I @ 6maphore.availablePermits()==0){
- log.info(Thread.cu{ U 3rrentThre@ @ x p ? \ad().s J N H o j z $getName()+",车牌号->"+accessParkDTO.getCarNo()+"% ; & A,车位不足请耐心等待");) X M 5 5 v
- }else{
- try{
- //获取令牌尝试进入停车场
- semaphore.acquire();
- log.info(Thread.currentThread().getName()+",车牌号->"+accessParkDTO.getCarNo()+",成功进入停车场");
- //模拟车辆在停车场停留的时间(30秒)
- Thread.sleep(30000);
- //释放令牌,腾出a h !停车场车位
- semaphore.release();
- l& : e ,og.iV h L 7nfo(Thread.currentThe F S P s nread().getName(G x 1 G j n \)+",车牌号->"+accessParkDTO.getCarNo()+",驶出停车场");
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- }
- });
- //封装返回信息
- returnAccessParkBO.builder().carNo(accessParkDTO.getCarNo())
- .currentPositionCount(semaphor} W ( _ ^e.. X 8 K r b t ? javailablePermits())
- .isPermitAcce\ 3 a U e I ( Vss(semaphore.av. u W ( a / CailablePermits()>0?true:false).build();
- }
- }
上述代码模拟停车场有2车= P + w | C 5 Z [位,并且每辆车进@ s w入车场后会停留30秒,然后并行模拟3次停车请求,具体执行效果如下:
- [kPoolExecutor-1]c.w.c.service.impl.ParkServiceImW S Opl:SingleBlockPoolExecutorf | H 6-1,车牌号-&m _ [ ; ~gt;10,成功进入停车R 0 1 % ] ` 8场顺序->A
- [kPoolExecutor-2]c.w.c.service.impl.ParkServiceImpl:SingleBlockPooM N i : JlExecutor-2,车X 3 8 ) - g T n t牌号->20,成功进入停车场顺序->A
- [kPoolExecutor-3]c.w.c.service.impl.ParkServiceImpl:Singld ~ O ! HeBlockPoo^ } G p j M b $ IlExecutor-3,车牌号->` n v ? f 0 $30,车位不足请耐心等待? 8 N # o N = l00,执行顺序->B
- [kPoolExecutor-1]c.w.c.service.impl.ParkServiceImpl:SingleBlockPoolExecutor-1,车e 1 Q \ O牌号->10,驶出停车场
- [kPoolExecutor-2]c.w.c.serv6 g ]ice.impl.A a ^ d x ` = r iParkServiceImpl:SingleBlockPoolExecutor-2,车牌号->20,驶出停车场
- [kPoolExecutor-4]c.w.c.service.impl.ParkServiceImpl:SingleBlockPoolExecutor-4,车牌号-$ 4 ? _ :>X c L30,成功进入停车场
可以看到由于通过Semaphore限制了可允许进入的线程数是2个,所以第三次V = \ + v K请求会被拒绝,直到前两次请求通过.rel~ t X l p 8 X lease()方法释放证书后第4次请求才会被允许进入!
后记
本文从应用层面总结了,JVM基本的内存9 b 0模型以及线程对共享内存操作的原子方式,并着重介绍了线程池、FutrueTask、CountDownLatch、CycliBarrier以及Semaphore这几种在Java并发编程中经常使用的JUC工具类。
原创文章,作者:2512141116@qq.com,如若转载,请注明出处:https://www.6fzz.com/1580.html