并发编程
2023-05-14 13:50:21 0 举报
AI智能生成
登录查看完整内容
并发编程
作者其他创作
大纲/内容
进程是`操作系统分配资源`的最小单元进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,是操作系统进行资源分配和调度的一个独立单位
进程
线程是`操作系统调度`的最小单元线程是进程的一个实体,是CPU调度和分派的基本单位
线程
1、进程基本上相互独立的,而线程存在于进程内,是进程的一个子集
2、进程拥有共享的资源,如内存空间等,供其内部的线程共享
- 同一台计算机的进程通信称为 `IPC(Inter-process communication)【进程间通信】`
- 不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP
3、进程间通信较为复杂
4、线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量
5、线程更轻量,线程上下文切换成本一般上要比进程上下文切换低
进程`在执行时通常拥有`独立的内存单元`,而线程之间可以共享内存
两者不同
进程线程
管道可用于具有亲缘关系的父子进程间的通信,有名管道除了具有管道所具有的功能外,它还允许无亲缘关系进程间的通信。
管道(pipe)及有名管道(named pipe)
信号是在软件层次上对中断机制的一种模拟,它是比较复杂的通信方式,用于通知进程有某事件发生,一个进程收到一个信号与处理器收到一个中断请求效果上可以说是一致的
信号(signal)
消息队列是消息的链接表,它克服了上两种通信方式中信号量有限的缺点,具有写权限得进程可以按照一定得规则向消息队列中添加新信息;对消息队列有读权限得进程则可以从消息队列中读取信息。
消息队列(message queue)
可以说这是最有用的进程间通信方式。它使得多个进程可以访问同一块内存空间,不同进程可以及时看到对方进程中对共享内存中数据得更新。这种方式需要依靠某种同步操作,如互斥锁和信号量等。
共享内存(shared memory)
主要作为进程之间及同一种进程的不同线程之间得同步和互斥手段。
信号量(semaphore)
这是一种更为一般得进程间通信机制,它可用于网络中不同机器之间的进程间通信,应用非常广泛
套接字(socket)
进程间的通讯方式
在同一时刻,有多个指令在多个CPU上同时执行。
并行
在同一时刻,有多个指令在单个CPU上交替执行
并发
并发并行
进程与线程
当一个线程修改了共享变量的值,其他线程能够看到修改的值
退不出的循环
① jvm层面, storeLoad内存屏障 (x86会使用 lock 替代 mfence)
② 上下文切换(上下文切换会将数据写入主存)
归根结底Java保证可见性就分为两种 :
通过 volatile 关键字保证可见性。
// JVM的内存屏障Unsafe.getUnsafe().storeFence();
通过 手动内存屏障保证可见性。
通过 synchronized 关键字保证可见性 【synchronized 底层会调用内存屏障】
通过 Lock保证可见性
通过 final 关键字保证可见性
如何保证可见性 ?
可见性
即程序执行的顺序按照代码的先后顺序执行,JVM 存在指令重排,所以存在有序性问题
主频的概念大家接触的比较多,而 CPU 的 Clock Cycle Time(`时钟周期时间`),等于主频的倒数,意思是 `CPU 能够识别的最小时间单位`,比如说 4G 主频的 CPU 的 Clock Cycle Time 就是 0.25 ns,作为对比,我们墙上挂钟的Cycle Time 是 1s
Clock Cycle Time
指令平均时钟周期数
CPI(Cycles Per Instruction)
CPI 的倒数,表示每个时钟周期能够运行的指令数
PCI(Instruction Per Clock Cycle)
程序的 CPU 执行时间,即我们前面提到的 user + system 时间,可以用下面的公式来表示程序 CPU 执行时间 = 指令数 * CPI * Clock Cycle Time
CPU执行时间
名词
指令重排序优化
支持流水线的处理器
JVM能根据处理器特性(CPU多级缓存系统、多核处理器等)适当的对机器指令进行重排序,使机器指令能更符合CPU的执行特性,最大限度的发挥机器性能。在编译器与CPU处理器中都能执行指令重排优化操作
指令重排
通过 内存屏障保证可见性。
通过 synchronized关键字保证有序性。
通过 Lock保证有序性。
如何保证有序性 ?
有序性
一个或多个操作,要么全部执行且在执行过程中不被任何因素打断,要么全部不执行,在 Java 中,对基本数据类型的变量的读取和赋值操作是原子性操作(64位处理器)
不会
单线程
会
多线程
答案
在 32 位的机器上对 long 型变量进行加减操作是否存在并发隐患?
原子性
并发三点特性
Java虚拟机规范中定义了Java内存模型(Java Memory Model,JMM),用于屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的并发效果,JMM规范了Java虚拟机与计算机内存是如何协同工作的:规定了一个线程如何和何时可以看到由其他线程修改过后的共享变量的值,以及在必须时如何同步的访问共享变量。JMM描述的是一种抽象的概念,一组规则,通过这组规则控制程序中各个变量在共享数据区域和私有数据区域的访问方式,JMM是围绕原子性、有序性、可见性展开的。
JMM定义
Java内存模型与硬件内存架构之间存在差异。硬件内存架构没有区分线程栈和堆。对于硬件,所有的线程栈和堆都分布在主内存中。部分线程栈和堆可能有时候会出现在CPU缓存中和CPU内部的寄存器中。如下图所示,Java内存模型和计算机硬件内存架构是一个交叉关系:
JMM与硬件内存架构的关系
lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态。
unlock(解锁):作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。
关于主内存与工作内存之间的具体交互协议,即一个变量如何从主内存拷贝到工作内存、如何从工作内存同步到主内存之间的实现细节,Java内存模型定义了以下八种操作来完成:
如果要把一个变量从主内存中复制到工作内存,就需要按顺寻地执行read和load操作, 如果把变量从工作内存中同步回主内存中,就要按顺序地执行store和write操作。但Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行。
不允许read和load、store和write操作之一单独出现
不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到主内存中。
不允许一个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中。
一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load或assign)的变量。即就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作。
一个变量在同一时刻只允许一条线程对其进行lock操作,但lock操作可以被同一条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。lock和unlock必须成对出现
如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值
如果一个变量事先没有被lock操作锁定,则不允许对它执行unlock操作;也不允许去unlock一个被其他线程锁定的变量。
对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)。
Java内存模型还规定了在执行上述八种基本操作时,必须满足如下规则
内存交互操作
单线程程序不会出现内存可见性问题
单线程程序
正确同步的多线程程序的执行将具有顺序一致性
正确同步的多线程程序
JMM为它们提供了最小安全性保障:线程执行时读取到的值,要么是之前某个线程写入的值,要么是默认值未同步程序在JMM中的执行时,整体上是无序的,其执行结果无法预知。 JMM不保证未同步程序的执行结果与该程序在顺序一致性模型中的执行结果一致。
1)顺序一致性模型保证单线程内的操作会按程序的顺序执行,而JMM不保证单线程内的操作会按程序的顺序执行,比如正确同步的多线程程序在临界区内的重排序。
2)顺序一致性模型保证所有线程只能看到一致的操作执行顺序,而JMM不保证所有线程能看到一致的操作执行顺序。
3)顺序一致性模型保证对所有的内存读/写操作都具有原子性,而JMM不保证对64位的long型和double型变量的写操作具有原子性(32位处理器)
未同步程序在JMM中的执行时,整体上是无序的,其执行结果无法预知。未同步程序在两个模型中的执行特性有如下几个差异。
未同步/未正确同步的多线程程序
按程序类型,Java程序的内存可见性保证可以分为下列3类
JMM内存可见性保证
JMM(Java内存模型)
保证可见性
保证有序性
原理
对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入
对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子(基于这点,我们通过会认为volatile不具备原子性)。volatile仅仅保证对单个volatile变量的读/写具有原子性,而锁的互斥执行的特性可以确保对整个临界区代码的执行具有原子性。
对volatile修饰的变量的读写操作前后加上各种特定的内存屏障来禁止指令重排序来保障有序性
特性
当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存。
当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效,线程接下来将从主内存中读取共享变量。
volatile写-读的内存语义
volatile修饰的变量的read、load、use操作和assign、store、write必须是连续的,即修改后必须立即同步回主内存,使用时必须从主内存刷新,由此保证volatile变量操作对多线程的可见性
JMM内存交互层面实现
通过lock前缀指令,会锁定变量缓存行区域并写回主内存,这个操作称为“缓存锁定”,缓存一致性机制会阻止同时修改被两个以上处理器缓存的内存区域数据。一个处理器的缓存回写到内存会导致其他处理器的缓存无效
硬件层面实现
volatile可见性实现原理
JVM中的字节码解释器(bytecodeInterpreter),用C++实现了JVM指令,其优点是实现相对简单且容易理解,缺点是执行慢
字节码解释器实现
模板解释器(templateInterpreter),其对每个指令都写了一段对应的汇编代码,启动时将每个指令与对应汇编代码入口绑定,可以说是效率做到了极致。
模板解释器实现
x86处理器中利用lock实现类似内存屏障的效果。(lock性能比mfence高)
在linux系统x86中的实现
volatile在hotspot的实现
① 确保后续指令执行的原子性。在Pentium及之前的处理器中,带有lock前缀的指令在执行期间会锁住总线,使得其它处理器暂时无法通过总线访问内存,很显然,这个开销很大。在新的处理器中,Intel使用缓存锁定来保证指令执行的原子性,缓存锁定将大大降低lock前缀指令的执行开销。
② LOCK前缀指令具有类似于内存屏障的功能【不是内存屏障指令】,禁止该指令与前面和后面的读写指令重排序。
③ LOCK前缀指令会等待它之前所有的指令完成、并且所有缓冲的写操作写回内存(也就是将store buffer中的内容写入内存)之后才开始执行,并且根据缓存一致性协议,刷新store buffer的操作会导致其他cache中的副本失效(重新从主存加载)。
lock前缀指令的作用
-XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly -Xcomp
执行demo添加 vm 参数
验证了可见性使用了lock前缀指令
汇编层面volatile的实现
(1)有保证的原子操作
(2)总线锁定,使用LOCK#信号和LOCK指令前缀
(3)缓存一致性协议,确保原子操作可以在缓存的数据结构上执行(缓存锁);这种机制出现在Pentium 4、Intel Xeon和P6系列处理器中
《64-ia-32-architectures-software-developer-vol-3a-part-1-manual.pdf》 如下描述32位的IA-32处理器支持对系统内存中的位置进行锁定的原子操作。这些操作通常用于管理共享的数据结构(如信号量、段描述符、系统段或页表),在这些结构中,两个或多个处理器可能同时试图修改相同的字段或标志。处理器使用三种相互依赖的机制来执行锁定的原子操作:
从硬件层面分析Lock前缀指令
volatile内存语义
dcl 的实例为什么要加 valitale ?
dcl(double-checked locking)
volatile禁止重排序场景:1. 第二个操作是volatile写,不管第一个操作是什么都不会重排序2. 第一个操作是volatile读,不管第二个操作是什么都不会重排序3. 第一个操作是volatile写,第二个操作是volatile读,也不会发生重排序
volatile重排序规则
1. 在每个volatile写操作的前面插入一个StoreStore屏障2. 在每个volatile写操作的后面插入一个StoreLoad屏障3. 在每个volatile读操作的前面插入一个LoadLoad屏障4. 在每个volatile读操作的后面插入一个LoadStore屏障
JMM内存屏障插入策略
LoadLoad屏障:(指令Load1; LoadLoad; Load2),在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
LoadStore屏障:(指令Load1; LoadStore; Store2),在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
StoreStore屏障:(指令Store1; StoreStore; Store2),在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
StoreLoad屏障:(指令Store1; StoreLoad; Load2),在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。它的开销是四种屏障中最大的。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能
JVM层面的内存屏障
1. lfence,是一种Load Barrier 读屏障
硬件层提供了一系列的内存屏障 memory barrier / memory fence(Intel的提法)来提供一致性的能力。拿X86平台来说,有几种主要的内存屏障:
1. 阻止屏障两边的指令重排序
对Load Barrier来说,在读指令前插入读屏障,可以让高速缓存中的数据失效,重新从主内存加载数据;对Store Barrier来说,在写指令之后插入写屏障,能让写入缓存的最新数据写回到主内存。Lock前缀实现了类似的能力,它先对总线和缓存加锁,然后执行后面的指令,最后释放锁后会把高速缓存中的数据刷新回主内存。在Lock锁住总线的时候,其他CPU的读写请求都会被阻塞,直到锁释放。不同硬件实现内存屏障的方式不同,Java内存模型屏蔽了这种底层硬件平台的差异,由JVM来为不同的平台生成相应的机器码。
2. 刷新处理器缓存/冲刷处理器缓存
内存屏障有两个能力
硬件层内存屏障
volatile 有序性问题
程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作;锁定规则:一个unLock操作先行发生于后面对同一个锁的lock操作;volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C;线程启动规则:Thread对象的start()方法先行发生于此线程的每个一个动作;线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生;线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行;对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始;
happens-before
在多核处理器架构上,所有的处理器是共用一条总线的,都是靠此总线来和主内存进行数据交互。当主内存的数据同时存在于多个处理的高速缓存中时,某一处理器更新了此共享数据后。会通过总线触发嗅探机制来通知其他处理器将自己高速缓存内的共享数据置为无效,在下次使用时重新从主内存加载最新数据。而这种通过总线来进行通信则称之为”缓存一致性流量“。因为总线是固定的,所有相应可以接受的通信能力也就是固定的了,如果缓存一致性流量突然激增,必然会使总线的处理能力受到影响。而恰好CAS和volatile 会导致缓存一致性流量增大。如果很多线程都共享一个变量,当共享变量进行CAS等数据变更时,就有可能产生总线风暴切勿滥用volatile
总线风暴?
内存(JMM)
继承Thead
实现Runnable
实现Callable(借助FutureTask)
线程池
本质上Java中实现线程只有一种方式,都是通过new Thread()创建线程,调用Thread#start启动线程最终都会调用Thread#run方法
实现多线程的四种方式
Thread和Runable的关系
run start区别
try - catch 处理
Future
线程内如何处理异常
Thread类的继承关系
成员变量
Thread 源码解析
创建运行线程
栈与栈帧
- 线程的 cpu 时间片用完
- 垃圾回收(STW)
- 有更高优先级的线程需要运行
- 线程自己调用了 `sleep、yield、wait、join、park、synchronized、lock` 等方法
线程的上下文切换/程序计数器
Java 线程只是一个对象,在调start()方法的时候才会取关联操作系统创建线程并映射
操作系统创建线程的过程: Thread【java】-> JavaThread 【jvm】-> Thread【os】
源码分析
Thread#start()源码分析
CPU(中央处理单元)从一个进程或线程到另一个进程或线程的切换。
内核(即操作系统的核心)对CPU上的进程(包括线程)执行以下活动:1、暂停一个进程的处理,并将该进程的CPU状态(即上下文)存储在内存中的某个地方2、从内存中获取下一个进程的上下文,并在CPU的寄存器中恢复它3、返回到程序计数器指示的位置(即返回到进程被中断的代码行)以恢复进程。
详细流程
内核模式是CPU的特权模式,其中只有内核运行,并提供对所有内存位置和所有其他系统资源的访问。其他程序(包括应用程序)最初在用户模式下运行,但它们可以通过系统调用运行部分内核代码
内核模式和用户模式
上下文切换只能在内核模式下发生
上下文切换是多任务操作系统的一个基本特性
就CPU时间而言,上下文切换对系统来说是一个巨大的成本,实际上,它可能是操作系统上成本最高的操作。因此,操作系统设计中的一个主要焦点是尽可能地避免不必要的上下文切换。与其他操作系统(包括一些其他类unix系统)相比,Linux的众多优势之一是它的上下文切换和模式切换成本极低。
上下文切换通常是计算密集型的
特点
vmstat 1 #可以看到整个操作系统每1秒CPU上下文切换的统计
其中cs列就是CPU上下文切换的统计。当然,CPU上下文切换不等价于线程切换,很多操作会造成CPU上下文切换:1. 线程、进程切换2. 系统调用3. 中断
命令查看CPU上下文切换情况【Linux】
上下文切换
Java线程属于内核级线程
线程运行原理
基于线程之上,但又比线程更加轻量级的存在,协程不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行),具有对内核来说不可见的特性。这样带来的好处就是性能得到了很大的提升,不会像线程切换那样消耗资源
线程的切换由操作系统调度,协程由用户自己进行调度,因此减少了上下文切换,提高了效率。
线程的默认stack大小是1M,而协程更轻量,接近1k。因此可以在相同的内存中开启更多的协程。
不需要多线程的锁机制:因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多
特点【协程的特点在于是一个线程执行,那和多线程比,协程有何优势?】
协程适用于被阻塞的,且需要大量并发的场景(网络io)。不适合大量计算的场景
协程(Coroutines)
汇总
sleep
yeild
底层使用的 wait
join
二阶段终止【模式】
不会清除打断标记
打断正常程序
清楚打断标记
打断阻塞程序(sleep、wait ...)
interrupt
stop()方法太过于暴力,强行把执行到一半的线程终止
不推荐的方法
stop()方法会真的杀死线程。如果线程持有ReentrantLock锁,被stop()的线程并不会自动调用ReentrantLock的unlock()去释放锁。那其他线程就再也没机会获得ReentrantLock锁, 这样其他线程就再也不能执行ReentrantLock锁锁住的代码逻辑,类似的方法还有suspend()和resume()方法, 这两个方法同样也都不建议使用了, 所以这里也就不多介绍了
当线程A处于WAITING、 TIMED_WAITING状态时, 如果其他线程调用线程A的interrupt()方法,则会使线程A返回到RUNNABLE状态,同时线程A的代码会触发InterruptedException异常
异常
如果线程处于RUNNABLE状态,并且没有阻塞在某个I/O操作上,例如中断计算基因组序列的线程A,此时就得依赖线程A主动检测中断状态了。如果其他线程调用线程A的interrupt()方法, 那么线程A可以通过isInterrupted()方法, 来检测自己是不是被中断了
主动检测通知
interrupt()方法仅仅是通知线程,线程有机会执行一些后续操作,同时也可以无视这个通知。被interrupt的线程,有两种方式接收通知:一种是异常, 另一种是主动检测
stop & interrupt
常用方法
区别
所有线程轮流使用CPU的使用权,平均分配每个线程占用CPU的时间片
分时调度模型
优先让优先级高的线程使用CPU,如果线程的优先级相同,那么会随机选择一个,优先级高的线程获取的CPU时间片相对多一些
每个线程将由系统来分配执行时间,线程的切换不由线程本身来决定
抢占式调度模型【java默认】
线程调度
假如计算机只有一个CPU,那么CPU在某一个时刻只能执行一条指令,线程只有得到CPU时间片,也就是使用权,才可以执行指令。所以说多线程程序的执行是有随机性,因为谁抢到CPU的使用权是不一定的
随机性
方法
线程优先级
用户线程/主线程
setDaemon(booleanon [true])
守护线程
操作系统
Java线程是无法区分操作系统线程是在可运行状态和运行状态的,所有统一定义为 RUNABLE
操作系统层面的阻塞状态在Java中还是Runnable
Java
JVM
线程状态
导致当前线程等待,直到另一个线程调用该对象的notify()方法或notifyAll()方法(会释放锁)
wait
唤醒正在等待对象监视器的`单个`线程
notify
唤醒正在等待对象监视器的`所有线程`
notifyAll
API
工作原理
- sleep是Thread静态方法,wait是Object的方法
- wait需要配置synchronized的使用,sleep则不需要
- sleep不会释放锁,wait会释放锁
wait 和 sleep区别
正确使用方式
join 体现的是【保护性暂停】模式join就是监听等待线程的结束,底层调用了`wait`方法阻塞,等被等待线程执行完会自己调用`notifyAll`方法进行唤醒
join的原理
同步模式之保护性暂停
异步模式之生产者/消费者
wait & notify(线程通讯)
对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果
boolean cancel (boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
boolean isCancelled () 任务是否已经取消,任务正常完成前将其取消,则返回 true
boolean isDone () 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
核心方法
FutreTask(具体的实现)
当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制
Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来
Future 注意事项
并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
没有异常处理:Future接口中没有关于异常处理的方法;
Future的局限性
font color=\"#e74f4c\
内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果
实现类似于 Dubbo 远程调用的 Forking Cluster场景
使用
当需要批量提交异步任务的时候建议你使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单
CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列
应用场景总结
CompletionService
CompletionStage接口: 执行某一个阶段,可向下执行后续阶段。异步执行,默认线程池是ForkJoinPool.commonPool()
CompletableFuture是Future接口的扩展和增强CompletableFuture实现了对任务的编排能力 (串行[依赖]、并行、聚合的关系)
thenApply() 把前面异步任务的结果,交给后面的FunctionthenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回
描述依赖关系
thenCombine:任务合并,有返回值thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。
描述and聚合关系
applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。
描述or聚合关系
CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行
并行执行
应用场景
CompletableFuture 常用方法总结
创建异步操作
join()和get()方法都是用来获取CompletableFuture异步之后的返回值。font color=\"#e74f4c\
join & get
获取结果
结果处理
thenApply 和 thenCompose的区别thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture;thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生成一个新的CompletableFuture。
thenApply & thenCompose
结果转换
thenAccept系列:对单个结果进行消费thenAcceptBoth系列:对两个结果进行消费thenRun系列:不关心结果,只对结果执行Action
结果消费
合成两个任务的结果,一并处理thenCombine
结果组合
将两个线程任务获取结果的速度相比较,按一定的规则进行下一步处理
applyToEither: 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作acceptEither:两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作runAfterEither:两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果runAfterBoth:两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果anyOf:anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFutureallOf:allOf方法用来实现多 CompletableFuture 的同时返回
任务交互
核心功能
CompletableFuture (CompletionStage)
CompletableFuture
Futrue & CompletableFuture
volatile
sychronized + wait/notify/notifyAll
ReentrantLock + await/signal/sinalAll
cas + park/unpark
等待唤醒机制
管道是基于“管道流”的通信方式,管道输入/输出流要用于线程之间的数据传输,而传输的媒介为内存
基于字符的:PipedWriter、PipedReader
基于字节流的:PipedOutputStream、PipedInputStream
管道输入/输出流的体现
示例
管道输入输出流
底层 通过 wait
thread.join
Java线程间通讯(总结)
Java线程
系统创建一个线程的成本是比较高的,因为它`涉及到与操作系统交互`,当程序中需要创建大量生存期很短暂的线程时,频繁的创建和销毁线程对系统的资源消耗有可能大于业务处理是对系统资源的消耗,为了提高性能,我们就可以采用线程池
为什么需要?
线程池状态 【shutdown & stop】
构造方法
拒绝策略
普通任务
ExecutorService【ScheduleedFutureTask】 任务调度执行流程
执行流程
getTaskCount():线程池已执行和未执行的任务总数
getCompletedTaskCount():已完成的任务数量
getPoolSize():线程池当前的线程数量
getCorePoolSize():线程池核心线程数
getActiveCount():当前线程池中正在执行任务的线程数量
监控
ThreadPoolExecutor
newFixedThreadPool(int nThreads):
newCachedThreadPool()
newSingleThreadExecutor()
newSingleThreadScheduledExecutor()
newScheduledThreadPool()
Executors工具类
提交任务
Shutdown
ShutdownNow 【Stop】
isShutdown
isTerminated
关闭任务
所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个 任务的延迟或异常都将会影响到之后的任务。
Timer
任务执行的间隔时间,如果任务本身的执行时间超过的间隔时间就会以本身时间为主
scheduleAtFixedRate
任务的间隔时间
scheduleWithFixedDelay
任务调度线程池
配置
运行流程
Tomcat线程池
基本思想: 将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题性质相同。求出子问题的解,就可得到原问题的解
分解:将要解决的问题划分成若干规模较小的同类问题;求解:当子问题划分得足够小时,用较简单的方法解决;合并:按原问题的要求,将子问题的解逐层合并构成原问题的解。
在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题
分治思想在很多领域都有广泛的应用,例如算法领域有分治算法(归并排序、快速排序都属于分治算法,二分法查找也是一种分治算法);大数据领域知名的计算框架 MapReduce 背后的思想也是分治
分治算法
都会影响任务的执行效率一是无法对大任务进行拆分,对于某个任务只能由单线程执行;二是工作线程从队列中获取任务时存在竞争情况
传统线程池ThreadPoolExecutor缺点
为了解决传统线程池的缺陷,Java7中引入Fork/Join框架,并在Java8中得到广泛应用。Fork/Join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool允许其他线程向它提交任务,并根据设定将这些任务拆分为粒度更细的子任务,这些子任务将由ForkJoinPool内部的工作线程来并行执行,并且工作线程之间可以窃取彼此之间的任务
ForkJoinPool最适合计算密集型任务,而且最好是非阻塞任务
ForkJoinPool是ThreadPoolExecutor线程池的一种补充,是对计算密集型场景的加强。根据经验和实验,任务总数、单任务执行耗时以及并行数都会影响到Fork/Join的性能。所以,当你使用Fork/Join框架时,你需要谨慎评估这三个指标,最好能通过模拟对比评估
Fork/Join框架介绍
ForkJoinPool 是用于执行 ForkJoinTask 任务的执行池,不再是传统执行池 Worker+Queue 的组合式,而是维护了一个队列数组 WorkQueue(WorkQueue[]),这样在提交任务和线程任务的时候大幅度减少碰撞
int parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定工作线程的数量。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()来设置并行级别;ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过factory来创建。注意,这里需要实现的是ForkJoinWorkerThreadFactory,而不是ThreadFactory。如果你不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作;UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理;boolean asyncMode:设置队列的工作模式:asyncMode ? FIFO_QUEUE : LIFO_QUEUE。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式。
ForkJoinPool构造器
execute 类型的方法在提交任务后,不会返回结果。ForkJoinPool不仅允许提交ForkJoinTask类型任务,还允许提交Runnable任务invoke 方法接受ForkJoinTask类型的任务,并在任务执行结束后,返回泛型结果。如果提交的任务是null,将抛出空指针异常。submit 方法支持三种类型的任务提交:ForkJoinTask类型、Callable类型和Runnable类型。在提交任务后,将返回ForkJoinTask类型的结果。如果提交的任务是null,将抛出空指针异常,并且当任务不能按计划执行的话,将抛出任务拒绝异常。
按类型提交不同的任务
ForkJoinPool
ForkJoinTask是ForkJoinPool的核心之一,它是任务的实际载体,定义了任务执行时的具体逻辑和拆分逻辑。ForkJoinTask继承了Future接口,所以也可以将其看作是轻量级的Future
fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中
fork() — 提交任务
join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果
join() — 获取任务执行结果
通常情况下我们不需要直接继承ForkJoinTask类而只需要继承它的子类,Fork/Join框架提供了以下三个子类:RecursiveAction:用于递归执行但不需要返回结果的任务。RecursiveTask :用于递归执行需要返回结果的任务。CountedCompleter<T> :在任务完成执行后会触发执行一个自定义的钩子函数
ForkJoinTask最适合用于纯粹的计算任务,也就是纯函数计算,计算过程中的对象都是独立的,对外部没有依赖。提交到ForkJoinPool中的任务应避免执行阻塞I/O
ForkJoinTask使用限制
ForkJoinTask
Fork/Join的使用
ForkJoinPool 内部有多个工作队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个工作队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的工作队列中。ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的top,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从top取出任务来执行。每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务,窃取的任务位于其他线程的工作队列的base,也就是说工作线程在窃取其他工作线程的任务时,使用的是FIFO 方式。在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。在既没有自己的任务,也没有可以窃取的任务时,进入休眠
工作流程
ForkJoinPool与ThreadPoolExecutor有个很大的不同之处在于,ForkJoinPool存在引入了工作窃取设计,它是其性能保证的关键之一。工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争
工作窃取算法缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
主要原因是为了提高性能,通过始终选择最近提交的任务,可以增加资源仍分配在CPU缓存中的机会,这样CPU处理起来要快一些。而窃取者之所以从尾部获取任务,则是为了降低线程之间的竞争可能,毕竟大家都从一个部分拿任务,竞争的可能要大很多。此外,这样的设计还有一种考虑。由于任务是可分割的,那队列中较旧的任务最有可能粒度较大,因为它们可能还没有被分割,而空闲的线程则相对更有“精力”来完成这些粒度较大的任务
为什么设计工作线程总是从头部获取任务,窃取线程从尾部获取任务?
工作窃取
WorkQueue 是双向列表,用于任务的有序执行,如果 WorkQueue 用于自己的执行线程 Thread,线程默认将会从尾端选取任务用来执行 LIFO。每个 ForkJoinWorkThread 都有属于自己的 WorkQueue,但不是每个 WorkQueue 都有对应的 ForkJoinWorkThread。没有 ForkJoinWorkThread 的 WorkQueue 保存的是 submission,来自外部提交,在WorkQueues[] 的下标是 偶数 位。
工作队列WorkQueue
ForkJoinWorkThread 是用于执行任务的线程,用于区别使用非 ForkJoinWorkThread 线程提交task。启动一个该 Thread,会自动注册一个 WorkQueue 到 Pool,拥有 Thread 的 WorkQueue 只能出现在 WorkQueues[] 的 奇数 位。
ForkJoinWorkThread
ForkJoinPool 的工作原理
ForkJoin源码分析
ForkJoinPool执行流程
为什么不推荐使用jdk8并行流?
ForkJoin
CPU计算时间越长,线程的数量【在CPU核数的基础上】越少,IO操作越多,线程数量越多
CPU 密集型运算(以计算为主)
IO密集型
任务分类
如果想要更准确的话,可以进行压测,监控 JVM 的线程情况以及 CPU 的负载情况,根据实际情况衡量应该创建的线程数,合理并充分利用资源
线程池创建多少线程 ?
成员变量和静态变量是否线程安全?
局部变量是否线程安全?
线程安全分析
临界区 Critical Section
竞态条件 Race Condition
阻塞: synchronized、Lock
非阻塞: 原子类
解决方案
共享问题
synchronized
同步方法
同步代码块
synchronized是JVM内置锁,基于Monitor机制实现,依赖底层操作系统的互斥原语Mutex(互斥量),它是一个重量级锁,性能较低。当然,JVM内置锁在1.5之后版本做了重大的优化,如锁粗化(Lock Coarsening)、锁消除(Lock Elimination)、轻量级锁(Lightweight Locking)、偏向锁(Biased Locking)、自适应自旋(Adaptive Spinning)等技术来减少锁操作的开销,内置锁的并发性能已经基本与Lock持平。Java虚拟机通过一个同步结构支持方法和方法中的指令序列的同步:monitor。同步方法是通过方法中的access_flags中设置ACC_SYNCHRONIZED标志来实现;同步代码块是通过monitorenter和monitorexit来实现。两个指令的执行是JVM通过调用操作系统的互斥原语mutex来实现,被阻塞的线程会被挂起、等待重新调度,会导致“用户态和内核态”两个态之间来回切换,对性能有较大影响。
非公平
加锁字节码
概述
在管程的发展史上,先后出现过三种不同的管程模型,分别是Hasen模型、Hoare模型和MESA模型。现在正在广泛使用的是MESA模型
Java语言的内置管程synchronized
MESA模型
在获取锁时,是将当前线程插入到cxq的头部,而释放锁时,默认策略(QMode=0)是:如果EntryList为空,则将cxq中的元素按原有顺序插入到EntryList,并唤醒第一个线程,也就是当EntryList为空时,是后来的线程先获取锁(cxq FILO 栈结构)。_EntryList不为空,直接从_EntryList中唤醒线程
java.lang.Object 类定义了 wait(),notify(),notifyAll() 方法,这些方法的具体实现,依赖于 ObjectMonitor 实现,这是 JVM 内部基于 C++ 实现的一套机制。
hash: 保存对象的哈希码。运行期间调用System.identityHashCode()来计算,延迟计算,并把结果赋值到这里。
age: 保存对象的分代年龄。表示对象被GC的次数,当该次数到达阈值的时候,对象就会转移到老年代。
biased_lock: 偏向锁标识位。由于无锁和偏向锁的锁标识都是 01,没办法区分,这里引入一位的偏向锁标识位。
JavaThread*: 保存持有偏向锁的线程ID。偏向模式的时候,当某个线程持有对象的时候,对象这里就会被置为该线程的ID。 在后面的操作中,就无需再进行尝试获取锁的动作。这个线程ID并不是JVM分配的线程ID号,和Java Thread中的ID是两个概念。
epoch: 保存偏向时间戳。偏向锁在CAS锁操作过程中,偏向性标识,表示对象更偏向哪个锁。
32位JVM下的对象结构描述
ptr_to_lock_record:轻量级锁状态下,指向栈中锁记录的指针。当锁获取是无竞争时,JVM使用原子操作而不是OS互斥,这种技术称为轻量级锁定。在轻量级锁定的情况下,JVM通过CAS操作在对象的Mark Word中设置指向锁记录的指针。
ptr_to_heavyweight_monitor:重量级锁状态下,指向对象监视器Monitor的指针。如果两个不同的线程同时在同一个对象上竞争,则必须将轻量级锁定升级到Monitor以管理等待的线程。在重量级锁定的情况下,JVM在对象的ptr_to_heavyweight_monitor设置指向Monitor的指针
64位JVM下的对象结构描述
Mark Word的结构
Mark Word中锁标记枚举
运行时元数据【Mark World】
对象头的另外一部分是klass类型指针,即对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。 32位4字节,64位开启指针压缩或最大堆内存<32g时4字节,否则8字节。jdk1.8默认开启指针压缩后为4字节,当在JVM参数中关闭指针压缩(-XX:-UseCompressedOops)后,长度为8字节
类型指针【Klass Pointer】
数组长度(只有数组对象有)
普通对象
数组对象
Mark World
例子
对象头(Header)
相同宽度的字段分配在一起
父类分配的字段会在子类之前
如果 compectFeilds 设置为 true (默认 true),则子类的窄变量可以在放在父类变量的间隙
规则
实际数据(Instance Data)
由于虚拟机要求对象起始地址必须是8字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐
对其填充(Padding)
对象内存布局
<!-- 查看Java 对象布局、大小工具 --><dependency> <groupId>org.openjdk.jol</groupId> <artifactId>jol-core</artifactId> <version>0.10</version></dependency>
依赖
OFFSET:偏移地址,单位字节;
SIZE:占用的内存大小,单位为字节;
TYPE DESCRIPTION:类型描述,其中object header为对象头;
VALUE:对应内存中当前存储的值,二进制32位;
利用jol查看64位系统java对象(空对象),默认开启指针压缩,总大小显示16字节,前12字节为对象头
关闭指针压缩后,对象头为16字节:-XX:-UseCompressedOops
使用方法
JOL工具
Monitor(锁)【监视器/管程】
Monitor
场景
语法
流程
不行
轻量级锁是否可以降级为偏向锁?
轻量级锁不存在自旋
轻量级锁【线程间有轻微的竞争(线程交替执行)】
偏向锁 -> 轻量级锁
轻量级锁 -> 重量级锁
锁膨胀/锁升级
重量级锁竞争的时候,可以使用自旋来进行优化,轻量级锁不存在自旋自旋的目的是为了减少线程挂起的次数,尽量避免直接挂起线程(挂起操作涉及系统调用,存在用户态和内核态切换,这才是重量级锁最大的开销)
自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势
在 Java 6 之后自旋是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,比较智能。
Java 7 之后不能控制是否开启自旋功能
注意
锁自旋
当JVM启用了偏向锁模式(jdk6默认开启),新创建对象的Mark Word中的Thread Id为0,说明此时处于可偏向但未偏向任何线程,也叫做匿名偏向状态(anonymously biased)。只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有
偏向锁模式存在偏向锁延迟机制:HotSpot 虚拟机在启动后有个 4s 的延迟才会对每个新建的对象开启偏向锁模式。JVM启动时会进行一系列的复杂活动,比如装载配置,系统类初始化等等。在这个过程中会使用大量synchronized关键字对对象加锁,且这些锁大多数都不是偏向锁。为了减少初始化时间,JVM默认延时加载偏向锁。
偏向状态
默认:有偏向锁会采用偏向锁,如果其他线程用到了该对象就会撤销偏向锁,升级为轻量级锁,如果轻量级锁加了但是有竞争发生,这时候锁膨胀为重量级锁偏向锁 —> 轻量级锁—> 重量级锁
加锁顺序
偏向锁没有地方存储哈希码- 轻量级锁会在锁记录中记录 hashCode- 重量级锁会在 Monitor中记录 hashCode
调用对象hashCode
调用 wait/notify
撤销偏向锁
从偏向锁的加锁解锁过程中可看出,当只有一个线程反复进入同步块时,偏向锁带来的性能开销基本可以忽略,但是当有其他线程尝试获得锁时,就需要等到safe point时,再将偏向锁撤销为无锁状态或升级为轻量级,会消耗一定的性能,所以在多线程竞争频繁的情况下,偏向锁不仅不能提高性能,还会导致性能下降
以class为单位,为每个class维护一个偏向锁撤销计数器,每一次该class的对象发生偏向撤销操作时,该计数器+1,当这个值达到重偏向阈值(默认20)时,JVM就认为该class的偏向锁有问题,因此会进行批量重偏向。
每个class对象会有一个对应的epoch字段,每个处于偏向锁状态对象的Mark Word中也有该字段,其初始值为创建该对象时class中的epoch的值。每次发生批量重偏向时,就将该值+1,同时遍历JVM中所有线程的栈,找到该class所有正处于加锁状态的偏向锁,将其epoch字段改为新值。下次获得锁时,发现当前对象的epoch值和class的epoch不相等,那就算当前已经偏向了其他线程,也不会执行撤销操作,而是直接通过CAS操作将其Mark Word的Thread Id 改成当前线程Id。
批量重偏向
当达到重偏向阈值(默认20)后,假设该class计数器继续增长,当其达到批量撤销的阈值后(默认40),JVM就认为该class的使用场景存在多线程竞争,会标记该class为不可偏向(整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的),之后,对于该class的锁,直接走轻量级锁的逻辑。
注意:时间-XX:BiasedLockingDecayTime=25000ms范围内没有达到40次,撤销次数清为0,重新计时
批量撤销
批量重偏向(bulk rebias)机制是为了解决:一个线程创建了大量对象并执行了初始的同步操作,后来另一个线程也来将这些对象作为锁对象进行操作,这样会导致大量的偏向锁撤销操作。
批量撤销(bulk revoke)机制是为了解决:在明显多线程竞争剧烈的场景下使用偏向锁是不合适的。
-XX:BiasedLockingBulkRebiasThreshold 和 -XX:BiasedLockingBulkRevokeThreshold 来手动设置阈值
JVM的默认参数值
批量重偏向和批量撤销是针对类的优化,和对象无关
偏向锁重偏向一次之后不可再次重偏向
当某个类已经触发批量撤销机制后,JVM会默认当前类产生了严重的问题,剥夺了该类的新实例对象使用偏向锁的权利
总结
批量重偏向/ 批量撤销
偏向锁【不存在竞争】
锁消除
一系列的连续操作都会对同一个对象反复加锁及解锁,甚至加锁操作是出现在循环体中的,即使没有出现线程竞争,频繁地进行互斥同步操作也会导致不必要的性能损耗
锁粗化
锁对象状态转换
当一个对象在方法中被定义后,它可能被外部方法所引用,例如作为调用参数传递到其他地方中。
方法逃逸(对象逃出当前方法)
这个对象甚至可能被其它线程访问到,例如赋值给类变量或可以在其它线程中访问的实例变量。
线程逃逸((对象逃出当前线程)
逃逸分析,是一种可以有效减少Java 程序中同步负载和内存堆分配压力的跨函数全局数据流分析算法。通过逃逸分析,Java Hotspot编译器能够分析出一个新的对象的引用的使用范围从而决定是否要将这个对象分配到堆上。逃逸分析的基本行为就是分析对象动态作用域
-XX:+DoEscapeAnalysis //表示开启逃逸分析 (jdk1.8默认开启)
-XX:-DoEscapeAnalysis //表示关闭逃逸分析。
-XX:+EliminateAllocations //开启标量替换(默认打开)
JVM参数
如果一个对象在子程序中被分配,要使指向该对象的指针永远不会逃逸,对象可能是栈分配的候选,而不是堆分配
栈上分配
如果一个对象被发现只能从一个线程被访问到,那么对于这个对象的操作可以不考虑同步
同步消除(锁消除)
标量替换/分离对象
逃逸分析技术
逃逸分析(Escape Analysis)
如果某个线程正在自旋抢占该锁,则会抢占成功,这种策略会优先保证通过自旋抢占锁的线程获取锁,而处于等待队列中的线程则靠后,尽量避免直接挂起线程(挂起操作涉及系统调用,存在用户态和内核态切换,这才是重量级锁最大的开销)
为什么非公平锁的效率比较高?
gc
锁对象monitor什么时候清除?
synchronized原理
基本使用
Park & Unpark【LockSupport】
① 【NEW -> RUNABLE】 调用 start() 方法
调用obj.wait() RUNABLE -> WAITING
竞争成功: WAITING -> RUNABLE
竞争失败: WAITING -> BLOCKED
调用 obj.notify() , obj.notifyAll() , t.interrupt()`
② 【RUANNABLE <---> WAITING】线程用 synchronized(obj) 获取了对象锁后
当前线程调用 t.join()
线程运行结束,或调用了当前线程的 interrupt()
③ 【RUANNABLE <---> WAITING】
LockSupport.park()
调用 LockSupport.unpark(目标线程)`或调用了线程 的 interrupt()
④ 【RUANNABLE <---> WAITING】
obj.wait(long n)
等待时间超过了 n 毫秒,或调用 `obj.notify() , obj.notifyAll() , t.interrupt()`
⑤ 【RUNNABLE <---> TIMED_WAITING】线程用 synchronized(obj) 获取了对象锁后
t.join(long n)
等待时间超过了 n 毫秒,或t线程运行结束,或调用了当前线程的 `interrupt()`
⑥ 【RUNNABLE <---> TIMED_WAITING】
Thread.sleep(long n)
等待时间超过了 n 毫秒
⑦ 【RUNNABLE <---> TIMED_WAITING】
LockSupport.parkNanos(long nanos)` 或 `LockSupport.parkUntil(long millis)`
`LockSupport.unpark(目标线程)` 或调用了线程 的 `interrupt()` ,或是等待超时
⑧ 【RUNNABLE <---> TIMED_WAITING】
synchronized(obj) 获取了对象锁时如果竞争失败
竞争成功: BLOCKED -> RUNABLE
竞争失败: BLOCKED
持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争
⑨ 【RUNNABLE <---> BLOCKED】
⑩ 【RUNNABLE <---> TERMINATED】
线程状态转换
多个线程互相拿着对方的锁
死锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。- A线程在+- B线程在-
活锁
一个或者多个线程因为种种原因无法获得所需要的资源,导致一直无法执行的状态(CPU没有调度执行)。多线程中优先级高的会优先执行,并且抢占优先级低的资源,导致优先级低的线程无法得到执行(很难出现)
饥饿
活跃性
管程(悲观锁)阻塞
为什么无锁效率高?
CAS特点
循环时间太长
只能保证一个共享变量进行原子操作
ABA问题
缺点
CAS和volatile
AtomicInteger
AtomicLong
AtomicBoolean
基本类型
AtomicReference
原子更新引用类型里的字段原子类
利用版本号解决了ABA问题
AtomicStampedReference
原子更新带有标记位的引用类型(true & flase)
AtomicMarkableReference
引用类型
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
数组类型
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
原子更新引用类形字段的更新器
AtomicReferenceFieldUpdater
对象属性修改类
原子类
LongAdder
DoubleAdder
类型
LongAdder的基本思路就是font color=\"#e74f4c\
设计思路
LongAdder内部有一个base变量,一个Cell[]数组:- base变量:非竞态条件下,直接累加到该变量上- Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中
LongAdder内部结构
只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容了。这也是LongAdder设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。
Striped64#longAccumulate方法
LongAdder#add方法
LongAdder#sum方法
LongAdder 源码解析
原子累加器
简介
CAS
主要包含堆外内存的分配、拷贝、释放、给定地址值操作等方法。
内存操作
线程调度相关操作(park & unpark)
内存屏障
Unsafe
无锁(乐观锁)非阻塞
AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架【park & unpark】
阻塞等待队列
共享/独占
公平/非公平
可重入
允许中断
AQS具备的特性
Exclusive-独占,只有一个线程能执行,如ReentrantLock
Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS资源共享方式
AQS当中的同步等待队列也称CLH队列,CLH队列是一种基于font color=\"#e74f4c\
同步等待队列
AQS中条件队列是使用单向链表保存的,用nextWaiter来连接:调用await方法阻塞线程;当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)
条件等待队列
AQS定义两种队列
值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
CANCELLED,值为1,表示当前的线程被取消;
SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
state 状态
自定义同步器
AQS
lock.lockInterruptibly()
可打断
font color=\"#f57c00\
可超时
公平锁
① 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列尾部添加一个节点,所以调用Condition#await方法的时候必须持有锁。② 调用Condition#signal方法会将Condition队列的首节点移动到阻塞队列尾部,然后唤醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所以调用Condition#signal方法的时候必须持有锁,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。
Condition接口详解
条件变量
synchronized是JVM层次的锁实现,ReentrantLock是JDK层次的锁实现;
synchronized的锁状态是无法在代码中直接判断的,但是ReentrantLock可以通过ReentrantLock#isLocked判断;
synchronized是非公平锁,ReentrantLock是可以是公平也可以是非公平的;
synchronized是不可以被中断的,而ReentrantLock#lockInterruptibly方法是可以被中断的;
在发生异常时synchronized会自动释放锁,而ReentrantLock需要开发者在finally块中显示释放锁;
synchronized在特定的情况下对于已经在等待的线程是后来的线程先获得锁(回顾一下sychronized的唤醒策略),而ReentrantLock对于已经在等待的线程是先来的线程先获得锁
synchronized和ReentrantLock的区别
Lock【ReentrantLock】
结构
非公平锁原理
可重入原理
不可打断模式
可打断模式
可打断原理
公平锁原理
await
signal
ReentrantLock原理
流量控制 / 可以控制访问特定资源的线程数量
使用场景
原理(共享锁)
Semaphore
CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集
底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的。而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。
实现原理
CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。
CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。
join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。
CountDownLatch与Thread.join的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、isBroken(用来知道阻塞的线程是否被中断)等方法
CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
CyclicBarrier是通过ReentrantLock的\"独占锁\"和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现
CountDownLatch与CyclicBarrier的区别
CountDownLatch
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用
CyclicBarrier 同步屏障
对共享资源有读和写的操作,且写操作没有读操作那么频繁(读多写少)- 读读并发- 读写,写读,写写互斥
公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
可重入:读锁和写锁都支持线程重入。以读写线程为例:读线程获取读锁后,能够再次获取读锁。写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。
锁降级:遵循获取写锁、再获取读锁最后释放写锁的次序,写锁能够降级成为读锁。
- 读锁不支持条件变量
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
- 重入时降级支持:即持有写锁的情况下去获取读锁
注意事项
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。锁降级可以帮助我们拿到当前线程修改后的结果而不被其他线程所破坏,防止更新丢失。
因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据的准备工作。
必要的。主要是为了保证数据的可见性如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。
锁降级中读锁的获取是否必要呢?
锁降级
- 读锁: 共享锁- 写锁: 独占锁
用一个变量如何维护多种状态
在 ReentrantLock 中,使用 Sync ( 实际是 AQS )的 int 类型的 state 来表示同步状态,表示锁被一个线程重复获取的次数。但是,读写锁 ReentrantReadWriteLock 内部维护着一对读写锁,如果要用一个变量维护多种状态,需要采用“按位切割使用”的方式来维护这个变量,将其切分为两部分:font color=\"#e74f4c\
读写状态的设计
读锁的内在机制其实就是一个共享锁。一次共享锁的操作就相当于对HoldCounter 计数器的操作。获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1通过 ThreadLocalHoldCounter 类,HoldCounter 与线程进行绑定。HoldCounter 是绑定线程的一个计数器,而 ThreadLocalHoldCounter 则是线程绑定的 ThreadLocal。
- HoldCounter是用来记录读锁重入数的对象- ThreadLocalHoldCounter是ThreadLocal变量,用来存放不是第一个获取读锁的线程的其他线程的读锁重入数对象
HoldCounter 计数器
① 读写互斥② 写写互斥③ 写锁支持同一个线程重入④ writerShouldBlock写锁是否阻塞实现取决公平与非公平的策略(FairSync和NonfairSync)
写锁的获取
写锁的释放
① 读锁共享,读读不互斥② 读锁可重入,每个获取读锁的线程都会记录对应的重入数③ 读写互斥,锁降级场景除外④ 支持锁降级,持有写锁的线程,可以获取读锁,但是后续要记得把读锁和写锁读释放⑤ readerShouldBlock读锁是否阻塞实现取决公平与非公平的策略(FairSync和NonfairSync)
读锁的获取
读锁的释放
图解流程
ReentrantReadWriteLock
ReentrantReadWriteLock读属于悲观读锁,StampedLock为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
StampedLock
读写锁
JUC AQS
Queue接口
BlockingQueue常用方法示例
BlockingQueue接口
有界阻塞队列,先进先出,存取相互排斥
容量固定必须指定长度,没有扩容机制
没有元素的位置也占用空间,被 null 占位
数据结构:静态数组
存取是同一把锁,操作的是同一个数组对象,存取互相排斥
锁: ReentrantLock
出队: 队列count=0,无元素可取时,阻塞在该对象上
notEmpty
入队: 队列count=length,放不进去元素时,阻塞在该对象上
notFull
阻塞对象
从队首开始添加元素,记录putlndex (到队尾时设置为0) & 唤醒notEmpty
入队
从队首开始取出元素,记录takelndex (到队尾时设置为0) & 唤醒notFull
出队
数组删除的时间复杂度为 O(n),不需要将删除元素后面的元素向前进行偏移,只需要偏移指针即可
为什么对数组操作要设计成双指针?
两个指针都是从队首向队尾移动,保证队列的先进先出原则
核心代码定义
ArrayBlockingQueue
无界阻塞队列,可以指定容量,默认为 Integer.MAX VALUE,先进先出,存取互不干扰
可以指定容量,默认为Integer:MAX VALUE,内部类 Node 存储元素
数据结构:链表
takeLock — 取Node节点保证前驱后继不会乱
putLock — 存Node节点保证前驱后继不会乱putLock
锁分离: 存取互不干扰,存取操作的是不同的Node对象
remove() — 只添加 tackLock锁
remove(E e) —— 两个锁都加
删除元素时两个锁一起加
notEmpty - 出队: 队列count=0,无元素可取时,阻塞在该对象上
notFull - 入队: 队列count=capacity,放不进去元素时,阻塞在该对象上
队尾入队,由last指针记录
队首出队,由head指针记录
先进先出
最主要的原因就是 LinkedBlockingQueue实现的队列中的锁是分离的
线程池中为什么使用LinkedBlockingQueue而不用ArrayBlockingQueue?
队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能
LinkedBlockingQueue与ArrayBlockingQueue对比
LinkedBlockingQueue
一个链表阻塞双端队列,无界可以指定容量,默认为 Integer.MAX VALUE
内部类 Node 存储元素
数据结构: 链表 (同LinkedBlockingQueue)
存取是同一把锁,操作的是同一个数组对象
锁: ReentrantLock (同ArrayBlockingQueue)
无元素可取时,阻塞在该对象上 (count=0)
放不进去时,阻塞在该对象上 (count=capacity)
notFull
阻塞对象 (同LinkedBlockingQueue)
常用于 “工作窃取算法”
LinkedBlockingDeque
队首队尾都可,API比较丰富,队首队尾均可添加/删除元素
优先级高的先出队,优先级低的后出队
一个支持优先级排序的无界阻塞队列
默认容量11,可指定初始容量,会自动扩容,最大容量是(Integer.MAX VALUE- 8)
数据结构:数组+二又堆
存取是同一把锁
出队,队列为空时阻塞
阻塞对象: NotEmpty
不阻塞,永远返回成功,无界
传入比较器对象就按照比较器的顺序排序
如果比较器为 null,则按照自然顺序排序(小 -> 大)
根据比较器进行堆化(排序) 自下而上
优先级最高的元素在堆顶 弹出堆顶元素)
弹出前比较两个子节点再进行堆化 (自上而下)
业务办理排队叫号,VIP客户插队
电商抢购活动,会员级别高的用户优先抢购到商品
最小堆演示
插入 O(1)
查找 O(N)
删除 O(N)
普通线性数组(无序)
插入 O(N)
查询 O(1)
删除 O(1)
按顺序排列的有序向量
完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:大顶堆和小顶堆。大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值;小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。
二叉堆
如何构造优先级队列?
PriorityBlockingQueue
一个使用优先级队列实现的无界阻塞队列
与PriorityBlockingQueue类似,不过没有阳塞功能
数据结构: PriorityQueue
阻塞对象: Condition available
入队: 不阻塞,无界队列,与优先级队列入队相同,available
为空时阻塞
小于等于0则出队
不为空 (已有线程阻塞),直接阻塞
为空,则将当前线程置为leader,并按照过期时间进行阻塞
判断leader线程是否为空(为了保证优先级)
大于0,说明没过期,则阻塞
检查堆顶元素过期时间
淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单
商城订单超时关闭
饿了么订餐通知:下单成功后60s之后给用户发送短信通知
异步短信通知功能
服务器中,有很多客户端的连接,空闲一段时间之后需要关闭
关闭空闲连接
缓存中的对象,超过了存活时间,需要从缓存中移出
缓存过期清除
在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等
任务超时处理
DelayQueue
是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介。
第一个线程Thread0是消费者访问,此时队列为空,则入队 (创建Node结点并赋值)
第二个线程Thread1也是消费者访问,与队尾模式相同,继续入队
第二个线程Thread2是生产者,携带了数据e,与队尾模式不同,不进行入队操作直接将该线程携带的数据e返回给队首的消费者,并唤醒队首线程Thread1(默认非公平策略是栈结构),出队
先消费 (take) ,后生产(put)
在其内部类中维护了数据
数据结构: 链表(Node)
锁: CAS+自旋(无锁)
阻塞: 自旋了一定次数后调用 LockSupport.park0
put、offer 为生产者,携带了数据 e,为 Data 模式,设置到 SNode或QNode 属性中
take、poll 为消费者,不携带数据,为 Request 模式,设置到 SNode或QNode属性中
存取调用同一个方法: transfer0
线程访问阻塞队列,先判断队尾节点或者栈顶节点的 Node 与当前入队模式是否相同
相同则构造节点 Node 入队,并阻塞当前线程,元素 e 和线程赋值给 Node 属性
不同则将元素 e(不为 null) 返回给取数据线程,队首或栈顶线程被唤醒,出队
过程
队尾匹配(判断模式),队头出队,FIFO
TransferQueue
公平模式
栈顶匹配,栈顶出栈,LIFO
TransferStack
非公平模式(默认策略)
SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务
SynchronousQueue的一个使用场景是在线程池里,如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueu为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool0就使用了SynchronousQueue,这个线程池根据需要(亲任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收
源码解析
SynchronousQueue
是 SynchronousQueue 和 LinkedBlockingQueue 的合体
一个由链表结构组成的无界阻塞队列
数据结构: 链表Node
锁: CAS+自旋(无锁)
阳塞: 自旋了一定次数后调用 LockSupport.park0
可以自己控制存元素是否需要阻塞线程,比如使用四个添加元素的方法就不会阻塞线程,只入队元素,使用 transfer0 会阻塞线程
取元素与 SynchronousQueue 基本一样,都会阻塞等待有新的元素进入被匹配到
LinkedTransferQueue
常见阻塞队列
FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
CachedThreadPool 选取的是 SynchronousQueue
ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列
线程池对于阻塞队列的选择
功能
容量
比如 ArrayBlockingQueue & PriorityBlockingQueue
能否扩容
比如 ArrayBlockingQueue & LinkedBlockingQueue
内存结构
LinkedBlockingQueue & SynchronousQueue
性能
选择策略
如何选择适合的阻塞队列?
阻塞队列 Blocking Queue
JDK7 在添加到链表的时候采用的是头插法
HashMap并发死链
JDK 1.7原理(分段锁)
JDK1.8原理(分段锁+CAS)
ConcurrentHashMap
ConcurrentLinkedQueue
写入时拷贝的思想
CopyOnWriteArrayList
线程安全集合
详细内容参考
Unsafe类
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列(快,但是不适应于分布式)的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型
juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)
juc下队列存在的问题
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)
环形数组结构
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
元素位置定位
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据
无锁设计
用缓存行填充解决了伪共享的问题
实现了基于事件驱动的生产者消费者模型(观察者模式)
Disruptor的设计方案
使用RingBuffer来作为队列的数据结构,RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用
Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉
当需要覆盖数据时,会执行一个策略,Disruptor给提供多种策略,比较常用的:
常见且默认的等待策略,当这个队列里满了,不执行覆盖,而是阻塞等待。使用ReentrantLock+Condition实现阻塞,最节省cpu,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景
BlockingWaitStrategy策略
SleepingWaitStrategy策略
这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略。
YieldingWaitStrategy策略
采用死循环,消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用,cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用
BusySpinWaitStrategy策略
能覆盖数据是否会导致数据丢失呢?
RingBuffer数据结构
RingBuffer(环形缓冲区):基于数组的内存级别缓存,是创建sequencer(序号)与定义WaitStrategy(拒绝策略)的入口。Disruptor(总体执行入口):对RingBuffer的封装,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用。Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享。Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口。EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence
Disruptor核心概念
申请写入m个元素;若是有m个元素可以写入,则返回最大的序列号。这里主要判断是否会覆盖未读的元素;若是返回的正确,则生产者开始写入元素。
一个生产者单线程写数据的流程
多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。
申请写入m个元素;若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。
多个生产者写数据
申请读取到序号n;若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;消费者读取元素。如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。然后,消费者读取下标从3到6共计4个元素。
消费者读数据
多个生产者写数据的流程
读写流程
图灵-基础概念 + 原理 + 实战
图灵-基础概念 + 原理 + 实战
源码分析
美团技术
参考文章:建议看文章
高性能队列 Disruptor
扩展
其他
多任务
sychronized+wait/notify/notifyAll
reentrantLock+Condition(await/singal/singalAll)
cas+park/unpark
等待唤醒机制的规范实现。此模式依赖于Java线程的阻塞唤醒机制
异步模式之生产者消费者
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
同步模式之Balking犹豫模式
interrupt 打断阻塞中的线程会清空打断状态
利用 interrupt 打断
利用停止标记
中止模式之二阶段中止模式
解决并发问题,其实最简单的办法就是让共享变量只有读操作,而没有写操作
final使用
immutability 模式
子主题
Copy-on-Write 缺点就是消耗内存,每次修改都需要复制一个新的对象出来,果写操作非常少(读多写少的场景),可以尝试使用 Copy-on-Write
在Java中,CopyOnWriteArrayList 和 CopyOnWriteArraySet 这两个 Copy-on-Write 容器,它们背后的设计思想就是 Copy-on-Write;通过 Copy-on-Write 这两个容器实现的读操作是无锁的,由于无锁,所以将读操作的性能发挥到了极致
CopyOnWrite容器
类 Unix 的操作系统中创建进程的 API 是 fork(),传统的 fork() 函数会创建父进程的一个完整副本,例如父进程的地址空间现在用到了 1G 的内存,那么 fork() 子进程的时候要复制父进程整个进程的地址空间(占有 1G 内存)给子进程,这个过程是很耗时的。而 Linux 中fork() 子进程的时候,并不复制整个进程的地址空间,而是让父子进程共享同一个地址空间;只用在父进程或者子进程需要写入的时候才会复制地址空间,从而使父子进程拥有各自的地址空间
函数式编程的基础是不可变性(Immutability),所以函数式编程里面所有的修改操作都需要 Copy-on-Write 来解决
函数式编程
Copy-On-Write 模式
Thread-Specific Storage 模式 【ThreadLocal】
避免共享的设计模式
模式
在程序中遇到线程安全问题的时候,我们优先找出临界区,判断是否真正会出现并发安全问题,如果出现并发安全问题,一、首先要考虑是能否尽可能的不需要加锁来实现,类似于 (ThreadLocal 和 Disruptor)的设计,让每个线程的使用自己的资源,避免发生资源的抢占导致的线程安全问题二、如果不可避免的要使用锁来解决,优先尝试使用CAS自旋来实现,尽可能降低锁的量级(注意:如果遇到资源争抢比较严重的场景下需要考虑自旋的次数设置,避免因为大量线程自旋带来的资源的抢占)三、如果更不可避免使用到了类似于synchronized等需要线程阻塞的重量级锁,或者是分布式锁的时候,为了保证大量的线程争抢一把锁,在设计锁的时候需要尽可能的把锁的粒度设置小(例如分段锁、读写锁)来降低锁争抢带来的消耗解决线程安全最好的方案就是设计出来没有线程争抢资源的场景。
遇到并发问题的解决思路
并发编程
0 条评论
回复 删除
下一页