并发编程
2022-10-20 15:02:35 0 举报
AI智能生成
十年并发经验总结
作者其他创作
大纲/内容
1.并发基础-冯诺依曼计算机模型
1.简述:程序与数据一样存贮,按程序编排的顺序,一步一步地取出指令,自动地完成指令规定的操作是计算机最基本的工作模型
2.计算机五大模型
1.控制器(Control):控制,调度
2.运算器(Datapath):算术运算和逻辑运算
3.存储器(Memory)存储程序、数据和各种信号、命令
4.输入输出
子主题
3.CPU
控制单元,运算单元,存储单元
子主题
CPU缓存结构
现代CPU为了提升执行效率,减少CPU与内存的交互(交互影响CPU效率),一般在CPU上集成了多级缓存架构,常见的为三级缓存结构
L1Cache,分为数据缓存和指令缓存,逻辑核独占
L2Cache,物理核独占,逻辑核共享
L3Cache,所有物理核共享
子主题
存储器存储空间大小:内存>L3>L2>L1>寄存器;存储器速度快慢排序:寄存器>L1>L2>L3>内存;还有一点值得注意的是:缓存是由最小的存储区块-缓存行(cacheline)组成,缓存行大小通常为64byte。缓存行是什么意思呢?比如你的L1缓存大小是512kb,而cacheline=64byte,那么就是L1里有512*1024/64个cacheline
CPU读取存储器数据过程
1、CPU要取寄存器X的值,只需要一步:直接读取
2、CPU要取L1cache的某个值,需要1-3步(或者更多):把cache行锁住,把某个数据拿来,解锁,如果没锁住就慢了
3、CPU要取L2cache的某个值,先要到L1cache里取,L1当中不存在,在L2里,L2开始加锁,加锁以后,把L2里的数据复制到L1,再执行读L1的过程,上面的3步,再解锁。
4、CPU取L3cache的也是一样,只不过先由L3复制到L2,从L2复制到L1,从L1到CPU
5、CPU取内存则最复杂:通知内存控制器占用总线带宽,通知内存加锁,发起内存读请求,等待回应,回应数据保存到L3(如果没有就到L2),再从L3/2到L1,再从L1到CPU,之后解除总线锁定
2.并发基础-JAVA内存模型
什么是JMM模型?
详解
对象,数据和变量存储在主内存,修改在工作内存,主内存公有,工作内存线程私有
子主题
主内存和工作内存
主内存
主要存储的是Java实例对象,所有线程创建的实例对象都存放在主内存中,不管该实例对象是成员变量还是方法中的本地变量(也称局部变量),当然也包括了共享的类信息、常量、静态变量,<font color="#c41230">由于是共享数据区域,多条线程对同一个变量进行访问可能会发生线程安全问题</font>
工作内存
1.存储当前方法的所有本地变量信息(工作内存中存储着主内存中的变量副本拷贝),每个线程只能访问自己的工作内存
2.<font color="#c41230">基本类型存储在工作内</font>存,<font color="#c41230">引用类型</font>该变量的引用会存储在功能内存的帧栈中,而对象实例将存储在<font color="#c41230">主内存(</font>共享数据区域,堆)中。但对于实例对象的成员变量,不管它是基本数据类型或者包装类型(Integer、Double等)还是引用类型,都会被存储到堆区
3.主内存中的实例对象可以被多线程共享,倘若两个线程同时调用了同一个对象的同一个方法,那么两条线程会将要操作的数据拷贝一份到自己的工作内存中,执行完成操作后才刷新到主内存
子主题
并发问题出现的原因
线程安全
例子
数据同步的八大原子操作
1)lock(锁定):作用于主内存的变量,把一个变量标记为一条线程独占状态
2)unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
3)read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
4)load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中
5)use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎
6)assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量
7)store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作
8)write(写入):作用于工作内存的变量,它把store操作从工作内存中的一个变量的值传送到主内存的变量中
子主题
并发编程的可见性,原子性与有序性问题
原子性
原子性指的是一个操作是不可中断的,即使是在多线程环境下,一个操作一旦开始就不会被其他线程影响
解决方案:除了JVM自身提供的对基本数据类型读写操作的原子性外,<font color="#c41230">可以通过synchronized和Lock实现原子性</font>。因为synchronized和Lock能够保证任一时刻只有一个线程访问该代码块
可见性
可见性指的是当一个线程修改了某个共享变量的值,其他线程是否能够马上得知这个修改的值
<font color="#c41230">volatile关键字保证可见性</font>。当一个共享变量被volatile修饰时,它会保证修改的值立即被其他的线程看到,即修改的值立即更新到主存中,当其他线程需要读取时,它会去内存中读取新值。synchronized和Lock也可以保证可见性,因为它们可以保证任一时刻只有一个线程能访问共享资源,并在其释放锁之前将修改的变量刷新到内存中
有序性
有序性是指对于单线程的执行代码,我们总是认为代码的执行是按顺序依次执行的,但对于多线程环境,则可能出现乱序现象,因为程序编译成机器码指令后可能会出现指令重排现象,重排后的指令与原指令的顺序未必一致
volatile关键字来保证一定的“有序性”,synchronized和Lock 保证绝对有序
Java内存模型happens-before
即不需要通过任何手段就能够得到保证的有序性,这个通常也称为happens-before原则
具体规则
子主题
指令重排序
子主题
as-if-serial语义
不管怎么重排序(编译器和处理器为了提高并行度),(单线程)程序的执行结果不能被改变。编译器、runtime和处理器都必须遵守as-if-serial语义
volatile内存语义
1.作用
1.保证被volatile修饰的共享变量对所有线程总数可见的,也就是当一个线程修改了一个被volatile修饰共享变量的值,新值总是可以被其他线程立即得知。
<br>2.禁止指令重排序优化
2.volatile无法保证原子性
3.volatile禁止重排优化
禁止指令重排优化,从而避免多线程环境下程序出现乱序执行的现象
实现方式:硬件层的内存屏障
内存屏障,又称内存栅栏,是一个CPU指令,它的作用有两个,一是保证特定操作的执行顺序,二是保证某些变量的内存可见性(利用该特性实现volatile的内存可见性)
3.并发基础-缓存一致性协议
MESI
解析说明
子主题
子主题
MESI协议状态切换过程分析
子主题
JVM-JMM-CPU底层全执行流程
子主题
4.并发基础-其他
CAS
全称:Compare And Swap
子主题
简介
存在问题
ABA问题
因为CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值 原来是A,变成了B,又变成了B,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了,。
ABA解决方案--<font color="#c41230">版本号</font>
循环时间长,开销大
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销
只能保证一个变量的原子操作
线程间通信
synchronized与volatile
volatile:用来修饰子字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而 对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。<br>
synchronized;可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
等待和通知
经典示例
1.使用wait(),notity(),notityall()需要对对象加锁
2.调用wait()方法后,线程由Running变为Waiting并将当前线程放置到对象的等待队列中
3.A线程调用notify()或notifyall()后,等待线程依旧不会从wait中返回,需要A线程释放锁后,等待的线程才会wait中返回
4.notify将等待队列中的一个线程移到同步队列,notifyall将等待队列中的全部线程移到同步队列,状态由waiting变为blocking
5.从wait方法返回的前提是获得了调用对象锁
Thread.join()
Atomic: 原子类
基础类型
AtomicBoolean
AtomicInteger
AtomicLong
数组
AtomicIntegerArray
AtomicLongArray
BooleanArray
引用
AtomicReference
AtomicMarkedReference
AtomicStampedReference
FieldUpdater
AtomicLongFieldUpdater
AtomicIntegerFieldUpdater
AtomicReferenceFieldUpdater
Unsafe
5.并发编程-Synchronized
1.基础知识
1.使用
2.设计同步器的意义
同步器的本质就是加锁,加锁目的:序列化访问临界资源,即同一时刻只能有一个线程访问临界资源(同步互斥访问)
3.如何解决线程并发安全问题?
实际上,所有的并发模式在解决线程安全问题时,采用的方案都是序列化访问临界资源。即在同一时刻,只能有一个线程访问临界资源,也称作同步互斥访问。
4.Java 中,提供了两种方式来实现同步互斥访问:synchronized 和 Lock
2.Synchronized
1.synchronized本质
synchronized内置锁是一种对象锁(锁的是对象而非引用),作用粒度是对象,可以用来实现对临界资源的同步互斥访问,是可重入的
2.synchronized底层原理
synchronized关键字被编译成字节码后会被翻译成monitorenter 和 monitorexit 两条指令分别在同步块逻辑代码的起始位置与结束位置
3.Monitor监视器锁
子主题
什么是monitor?
4.对象的内存布局
1.对象头(Header)
HotSpot虚拟机的对象头包括两部分信息,第一部分是“Mark Word”,用于存储对象自身的运行时数据, 如哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID、偏向时间戳等等,它是实现轻量级锁和偏向锁的关键
32位虚拟机图解
2.实例数据(Instance Data)
存放类的属性数据信息,包括父类的属性信息
3.对齐填充(Padding)
虚拟机规定对象起始地址必须是8字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐;<br>
3.synchronized锁的膨胀升级过程
1.锁的升级
锁的状态总共有四种,无锁状态、偏向锁、轻量级锁和重量级锁。随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁,但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级
2.锁的状态
1.偏向锁
偏向锁的核心思想是,如果一个线程获得了锁,那么锁就进入偏向模式,此时Mark Word 的结构也变为偏向锁结构,当这个线程再次请求锁时,无需再做任何同步操作,即获取锁的过程
2.轻量级锁
对绝大部分的锁,在整个同步周期内都不存在竞争
3.自旋锁
避免线程上下文切换,采取的尝试循环获取锁
4.锁消除
JIT编译时通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁
5.锁的粗化
对于本质上是同一个锁,将多个锁合并成一个锁
3.逃逸分析
使用逃逸分析,编译器可以对代码做如下优化:
子主题
4.JVM锁的膨胀升级
子主题
6.并发编程-AQS详解
1.提供了实现锁及同步机制的<font color="#c41230">基本框架</font>,为同步状态的原子性管理、线程的阻塞、线程的解除阻塞及排队管理提供了一种通用的机制
2.AQS结构模型
1.一个state属性
state表示同步状态,它的类型为32位整型,对state的更新必须要保证原子性
2.一个FIFO队列
双向链表
Node结构
Node prev:前驱节点,指向前一个节点
Node next:后续节点,指向后一个节点<br>
Node next:后续节点,指向后一个节点<br>
Thread thread:入队列时的当前线程
int waitStatus:有五种状态:
同步等待队列,存放可以竞争共享对象的线程
3.条件队列Condition
链表:先进先出,用于存储暂时不能竞争共享队列的线程,例如:阻塞队列满后的生产者则放入条件等待队列中
4.线程的阻塞与解除阻塞操作
LockSupport.park();
LockSupport.unpark(t);<br>
3.竞争逻辑
AQS依赖它来完成同步状态state的管理,当前线程如果获取同步状态失败时(CAS自旋次数用完也没有获取到),AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态<br>
4.源码分析
1.获取锁
独占式获取同步状态
2.进入阻塞队列
tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。addWaiter方法如下
3.出列
首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点
5.ConditionObject
Lock实现等待/通知机制
ConditionObject队列与CLH队列
调用了await()方法的线程,会被加入到conditionObject等待队列中,并且唤醒CLH队列中head节点的下一个节点
线程在某个ConditionObject对象上调用了singnal()方法后,等待队列中的firstWaiter会被加入到AQS的CLH队列中,等待被唤醒
当线程调用unLock()方法释放锁时,CLH队列中的head节点的下一个节点(在本例中是firtWaiter),会被唤醒
注意:ConditionObject对象都维护了一个单独的等待队列 ,AQS所维护的CLH队列是同步队列,它们节点类型相同,都是Node
子主题
6.独占与共享模式
独占式:同一时刻仅有一个线程持有同步状态,如ReentrantLock
公平锁: 按照线程在队列中的排队顺序,先到者先拿到锁。<br>
非公平锁: 当线程要获取锁时,无视队列顺序直接去抢锁,不讲道理的,谁抢到就是谁的
共享式:多个线程可同时执行,如Semaphore/CountDownLatch等都是共享式的产物
7.AQS的模板方法设计模式
模板方法模式: 在一个方法中定义一个算法的骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。
AQS定义的一些模板方法如下:<br>
简言之,就是AQS提供tryAcquire,tryAcquireShared等模板方法,给子类实现自定义的同步器。
8.自定义同步器
首先需要确定你要实现的是独占锁还是共享锁,定义原子变量state的含义,再定义一个内部类去继承AQS,重写对应的模板方法
大纲图
7.AQS应用--Lock锁
1..ReentrantLock
特性
可重入锁
公平锁
新线程来了先判断是否等待队列中是否有线程等待,没有采取获取锁
非公平锁
新线程来了先尝试获取锁,不管同步等待队列中是否有线程等待
子主题
ReentrantLock中的AQS的流程
8.AQS应用--阻塞队列
1.概述BlockingQueue
是java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制
队列类型
无限队列 (unbounded queue ) - 几乎可以无限增长
有限队列 ( bounded queue ) - 定义了最大容量
队列数据结构
通常用链表或者数组实现<br>一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列<br>主要操作:入队(EnQueue)与出队(Dequeue)
2.常见的阻塞队列
ArrayBlockingQueue 由数组支持的有界队列
有界队列,先进先出,存取相互排斥
数据结构:静态数组
固定大小,没有扩容
没有元素则存放null
锁:reentrantLock
存取都是同一把锁,操作同一个对象,存取相互排斥
阻塞对象
notEmpty
出队 队列数为0 阻塞消费者
notFull
入队,队列满则色生产者
入队
从对首开始添加,添加后唤醒消费者
出队
从对首开始取出,添加后唤醒生产者
两个指针都是从对首开始往队尾移动,保证队列的先进先出
LinkedBlockingQueue 由链接节点支持的可选有界队列
两把锁
PriorityBlockingQueue 由优先级堆支持的无界优先级队列
DelayQueue 由优先级堆支持的、基于时间的调度队列
3.BlockingQueue API
Queue
BlockingQueue
4.用例-多线程生产者-消费者示例
题目:
实现方式:
9.并发编程应用--工具类
1.类图
2.重点工具类详解
1.Semaphore
1.简介
Semaphore 字面意思是信号量的意思,它的作用是<font color="#c41230">控制访问</font>特定<font color="#c41230">资源</font>的<font color="#c41230">线程数目</font>,底层依赖AQS的状态State
2.使用
构造方法
public Semaphore(int permits)<br>public Semaphore(int permits, boolean fair)
重要方法
public void acquire() throws InterruptedException<br>public void release()<br>tryAcquire(int args,long timeout, TimeUnit unit)
资源访问,服务限流(Hystrix里限流就有基于信号量方式)。
3.案例
2.CountDownLatch
1.简介
能够使一个线程<font color="#c41230">(主线程</font>)等待其他线程完成各自的工作后再执行
2.原理
通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务
3.使用
latch.countDown();
latch.await();
4.案例
原理分析
子主题
3.CyclicBarrier
1.简介
栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行
2.使用
构造方法是CyclicBarrier(int parties)其参数表示屏障拦截的线程数量
cyclicBarrier.await();
每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞
3.案例
4.Executors
1.作用
主要用来创建线程池,代理了线程池的创建,使得你的创建入口参数变得简单
2.使用
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
5.Exchanger
1.作用
当一个线程运行到exchange()方法时会阻塞,另一个线程运行到exchange()时,二者交换数据,然后执行后面的程序
2.应用场景 极少
3.工具类简介
1.接口: Condition
Condition为接口类型,它将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。可以通过await(),signal()来休眠/唤醒线程。
2.接口: Lock
Lock为接口类型,Lock实现提供了比使用synchronized方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的Condition对象
3.接口: ReadWriteLock
ReadWriteLock为接口类型, 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。
4.抽象类: AbstractOwnableSynchonizer
AbstractOwnableSynchonizer为抽象类,可以由线程以独占方式拥有的同步器。此类为创建锁和相关同步器(伴随着所有权的概念)提供了基础。AbstractOwnableSynchronizer 类本身不管理或使用此信息。但是,子类和工具可以使用适当维护的值帮助控制和监视访问以及提供诊断。
5.抽象类(long): AbstractQueuedLongSynchronizer
AbstractQueuedLongSynchronizer为抽象类,以 long 形式维护同步状态的一个 AbstractQueuedSynchronizer 版本。此类具有的结构、属性和方法与 AbstractQueuedSynchronizer 完全相同,但所有与状态相关的参数和结果都定义为 long 而不是 int。当创建需要 64 位状态的多级别锁和屏障等同步器时,此类很有用。
6.核心抽象类(int): AbstractQueuedSynchonizer
AbstractQueuedSynchonizer为抽象类,其为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础
7.锁常用类: LockSupport
LockSupport为常用类,用来创建锁和其他同步类的基本<font color="#c41230">线程阻塞</font>原语。LockSupport的功能和"Thread中的 Thread.suspend()和Thread.resume()有点类似",LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程。但是park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题
8.锁常用类: ReentrantLock
ReentrantLock为常用类,它是一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大
9.锁常用类: ReentrantReadWriteLock
ReentrantReadWriteLock是读写锁接口ReadWriteLock的实现类,它包括Lock子类ReadLock和WriteLock。ReadLock是共享锁,WriteLock是独占锁。<br>
10.锁常用类: StampedLock
它是java8在java.util.concurrent.locks新增的一个API。StampedLock控制锁有三种模式(写,读,乐观读),一个StampedLock状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问,数字0表示没有写锁被授权访问。在读锁上分为悲观锁和乐观锁。
11.工具常用类: CountDownLatch
CountDownLatch为常用类,它是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
12.工具常用类: CyclicBarrier
CyclicBarrier为常用类,其是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
13.工具常用类: Phaser
Phaser是JDK 7新增的一个同步辅助类,它可以实现CyclicBarrier和CountDownLatch类似的功能,而且它支持对任务的动态调整,并支持分层结构来达到更高的吞吐量
14.工具常用类: Semaphore
Semaphore为常用类,其是一个计数信号量,从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
15.工具常用类: Exchanger
Exchanger是用于线程协作的工具类, 主要用于两个线程之间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange()方法交换数据,当一个线程先执行exchange()方法后,它会一直等待第二个线程也执行exchange()方法,当这两个线程到达同步点时,这两个线程就可以交换数据了
16.threadLocal
10.并发编程-集合工具
常用类图
2.并发类
Queue: ArrayBlockingQueue
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
Queue: LinkedBlockingQueue
一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低
Queue: LinkedBlockingDeque
一个基于已链接节点的、任选范围的阻塞双端队列。
Queue: ConcurrentLinkedQueue
一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。
Queue: ConcurrentLinkedDeque
是双向链表实现的无界队列,该队列同时支持FIFO和FILO两种操作方式。
Queue: DelayQueue
延时无界阻塞队列,使用Lock机制实现并发访问。队列里只允许放可以“延期”的元素,队列中的head是最先“到期”的元素。如果队里中没有元素到“到期”,那么就算队列中有元素也不能获取到。
Queue: PriorityBlockingQueue
无界优先级阻塞队列,使用Lock机制实现并发访问。priorityQueue的线程安全版,不允许存放null值,依赖于comparable的排序,不允许存放不可比较的对象类型
Queue: SynchronousQueue
没有容量的同步队列,通过CAS实现并发访问,支持FIFO和FILO。
Queue: LinkedTransferQueue
JDK 7新增,单向链表实现的无界阻塞队列,通过CAS实现并发访问,队列元素使用 FIFO(先进先出)方式。LinkedTransferQueue可以说是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集, 它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。<br>
List: CopyOnWriteArrayList
ArrayList 的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的复制来实现的。这一般需要很大的开销,但是当遍历操作的数量大大超过可变操作的数量时,这种方法可能比其他替代方法更 有效。在不能或不想进行同步遍历,但又需要从并发线程中排除冲突时,它也很有用。<br>
Set: CopyOnWriteArraySet
对其所有操作使用内部CopyOnWriteArrayList的Set。即将所有操作转发至CopyOnWriteArayList来进行操作,能够保证线程安全。在add时,会调用addIfAbsent,由于每次add时都要进行数组遍历,因此性能会略低于CopyOnWriteArrayList。
Set: ConcurrentSkipListSet
一个基于ConcurrentSkipListMap 的可缩放并发 NavigableSet 实现。set 的元素可以根据它们的自然顺序进行排序,也可以根据创建 set 时所提供的 Comparator 进行排序,具体取决于使用的构造方法
Map: ConcurrentHashMap
是线程安全HashMap的。ConcurrentHashMap在JDK 7之前是通过Lock和segment(分段锁)实现,JDK 8 之后改为CAS+synchronized来保证并发安全。<br>
Map: ConcurrentSkipListMap
线程安全的有序的哈希表(相当于线程安全的TreeMap);映射可以根据键的自然顺序进行排序,也可以根据创建映射时所提供的 Comparator 进行排序,具体取决于使用的构造方法
11.并发编程-线程池
1.线程池
1.线程
线程状态切换
线程的实现方式
Runnable,Thread,Callable
2.协程
3.线程池
1.什么时候使用线程池?
1.单个任务处理时间比较短<br>2.需要处理的任务数量很大
2.线程池优势
1.重用存在的线程,减少线程创建,消亡的开销,提高性能<br>2.提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。<br>3.提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
3.常见类型图
4.线程池存在5种状态
1、RUNNING
2、 SHUTDOWN
3、STOP
4、TIDYING
5、 TERMINATED
5线程池重点属性和方法
属性
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
int COUNT_BITS = Integer.SIZE - 3
int CAPACITY = (1 << COUNT_BITS) - 1;
方法
runStateOf(int c),获取运行状态;
workerCountOf:获取活动线程数;
ctlOf:获取运行状态和活动线程数的值。
4.ExecutorService接口
1.接口
execute(Runnable command):履行Ruannable类型的任务,
<T> Future<T> submit(Callable<T> task)
提交Callable任务,并返回代表此任务的Future对象
<T> Future<T> submit(Runnable task, T result)
提交Runnable任务,并返回代表此任务的Future对象
shutdown():在完成已提交的任务后封闭办事,不再接管新任务
shutdownNow():停止所有正在履行的任务并封闭办事
isTerminated():测试是否所有任务都履行完毕了。
isShutdown():测试是否该ExecutorService已被关闭。
2.实现类
ThreadPoolExecutor 默认线程池
线程池的创建
使用
1、public void execute() //提交任务无返回值<br>2、public Future<?> submit() //任务执行完成后有返回值
参数说明
corePoolSize
线程池中的核心线程数
maximumPoolSize
线程池中允许的最大线程数
keepAliveTime
线程池维护线程所允许的空闲时间
unit
keepAliveTime的单位;
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口
threadFactory
线程工厂用来创建新线程
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务
线程池监控
线程池原理图解
源码分析
ThreadPoolExecutor.execute(Runnable command)
线程数<核心线程数-新增核心线程
addWorker(command, true)
线程池可用且线程数>核心线程数--任务添加单阻塞队列
workQueue.offer(command)
队列已满且线程数《最大线程数,则新增核心线程
线程数大于最大线程--调用阻塞队列
执行流程图
ScheduledThreadPoolExecutor
用于执行延时任务或定时任务。
使用实例:微服务定时发送心跳
构造防范与方法
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
延迟执行,无返回值
executor.schedule(()->{},1000, TimeUnit.MICROSECONDS);
延迟执行,有返回值
executor.schedule(()->{ return 1 ; },1000, TimeUnit.MICROSECONDS)
定时执行
executor.scheduleAtFixedRate(()->{},1000,5000,TimeUnit.MICROSECONDS);
定时执行,执行完开始计算时间
executor.scheduleWithFixedDelay(()->{},1000,5000,TimeUnit.MICROSECONDS);
内部模型
DelayQueue--<font color="#c41230">无界,优先级队列--会排序</font>
DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
DelayQueue也是一个无界队列;
执行过程:
1.任务都放在队列中
2.工作线程的执行过程:
1.工作线程会从DelayQueue取已经到期的任务去执行;<br>2.执行结束后重新设置任务的到期时间,再次放回DelayQueue<br>
源码分析
子主题
子主题
2.常用类
接口: Executor
Executor接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。
ExecutorService
ExecutorService继承自Executor接口,ExecutorService提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。可以关闭 ExecutorService,这将导致其停止接受新任务。关闭后,执行程序将最后终止,这时没有任务在执行,也没有任务在等待执行,并且无法提交新任务
ScheduledExecutorService
ScheduledExecutorService继承自ExecutorService接口,可安排在给定的延迟后运行或定期执行的命令。
AbstractExecutorService
AbstractExecutorService继承自ExecutorService接口,其提供 ExecutorService 执行方法的默认实现。此类使用 newTaskFor 返回的 RunnableFuture 实现 submit、invokeAny 和 invokeAll 方法,默认情况下,RunnableFuture 是此包中提供的 FutureTask 类。
FutureTask
<span style="font-size: inherit;">FutureTask 为 Future 提供了基础实现,如获取任务执行结果(get)和取消任务(cancel)等。如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消(除非使用runAndReset执行计算)。<font color="#c41230">FutureTask 常用来封装 Callable 和 Runnable</font>,也可以作为一个任务提交到线程池中执行。除了作为一个独立的类之外,此类也提供了一些功能性函数供我们创建自定义 task 类使用。FutureTask 的线程安全由CAS来保证</span><br>
核心: ThreadPoolExecutor
ThreadPoolExecutor实现了AbstractExecutorService接口,也是一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。线程池可以解决两个不同问题: 由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
核心: ScheduledThreadExecutor
ScheduledThreadPoolExecutor实现ScheduledExecutorService接口,可安排在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ThreadPoolExecutor 具有额外的灵活性或功能时,此类要优于 Timer。
核心: Fork/Join框架
ForkJoinPool 是JDK 7加入的一个线程池类。Fork/Join 技术是分治算法(Divide-and-Conquer)的并行实现,它是一项可以获得良好的并行性能的简单且高效的设计技术。目的是为了帮助我们更好地利用多处理器带来的好处,使用所有可用的运算能力来提升应用的性能。
工具类: Executors
Executors是一个工具类,用其可以创建ExecutorService、ScheduledExecutorService、ThreadFactory、Callable等对象。它的使用融入到了ThreadPoolExecutor, ScheduledThreadExecutor和ForkJoinPool中。注意:为什么很多公司不允许使用Executors去创建线程池
Future
提交方式
12.并发编程-框架
1.Future&ForkJoin
并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架
Fork/Jion特性:
ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好
ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等
ForkJoinPool 最适合的是<font color="#c41230">计算密集型</font>的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker
工作窃取算法
是指某个线程从其他队列里窃取任务来执行
算法
1.ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
2.每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
3.每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
4.在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成
5.在既没有自己的任务,也没有可以窃取的任务时,进入休眠
fork/join的使用
ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类
Fork/Join 框架提供了以下两个子类:
RecursiveAction
用于没有返回结果的任务
RecursiveTask
用于有返回结果的任务
CountedCompleter
在任务完成执行后会触发执行一个自定义的钩子函数
ForkJoinPool
ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
使用案例
定义fork/join任务,如下示例,随机生成2000w条数据在数组当中,然后求和
代码
1.定义类继承RecursiveTask,重写compute
2.定义forkJoinPool fjp = new ForkJoinPool(4);
fork/join框架原理
1、异常处理
ForkJoinTask 提供了 isCompletedAbnormally() 方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常,getException 方法返回 Throwable 对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回 null
2.ForkJoinPool构造函数解析
3、ForkJoinTask fork 方法
4、ForkJoinTask join 方法
5.ForkJoinPool.submit 方法
Fork/Join框架执行流程
任务执行分两种:
子主题
2.无锁并发框架-Disruptor
0 条评论
下一页