并发编程
2021-03-23 11:04:40 60 举报
AI智能生成
请大家不要直接克隆,着手梳理一遍才会变成自己的知识
作者其他创作
大纲/内容
一、机制以及实现原理
volatile
特点
1、保证可见性
在某个线程改变变量时,将更新后的值写入主存,并通知其他线程该变量失效,必须重新从主存读取
2、不保证原子性
在执行内存屏障之前,不同 CPU 依旧可以对同一个缓存行持有,一个 CPU 对同一个缓存行的修改不能让另一个 CPU 及时感知,因此出现并发冲突。线程安全还是需要用锁来保障,锁能有效的让 CPU 在同一个时刻独占某个缓存行,执行完并释放锁后,其他CPU才能访问该缓存行。<br>
解决方法:可以用AtomicInteger原子类解决原子性问题<br>AtomicInteger属于unsafe类,unsafe类提供了硬件级别的原子操作,可以直接访问操作系统底层,调用的是native方法
3、防止指令重排序
在对volatile修饰的变量操作前后加入内存屏障
ABA问题
小明在银行存有100元,他来取50元钱,有AB两个线程同时执行,但是A由于某种原因导致线程阻塞,B成功执行了,账户余额从100变成50,此时小红小红向小明汇款50元,线程C成功执行汇款操作,小米账户余额从50变成100,此时A线程继续执行,由于CAS拿到的值时100,所以可以成功更改成50,A线程执行完毕后,小明账户余额50,造成了有50元丢失
加入版本号即可解决该问题
synchronized关键字<br>
表现形式
1、修饰普通方法,锁着的是方法的调用者
2、修饰static静态方法,锁住的是类模板
3、修饰的是方法块时,锁住的是括号里的对象<br>
synchronized锁存在对象头里
原子操作的实现原理
CPU原子操作实现
1、总线锁
2、缓存锁
java原子操作实现
1、循环CAS
CAS三大问题
1、ABA问题
2、循环时间长开销大
3、只能保证一个共享变量的原子操作<br>(可以把多个变量放在一个对象里实现原子操作)
2、通过锁机制
java中的锁除了偏向锁外都用到了CAS
二、JMM(JavaMemoryModel)
作用
缓存一致性协议、用于规定数据读写规则<br>
模型
每个线程复制主内存的数据到线程私有内存中,再进行执行
带来的问题
修改不可见性
解决方法
volatile关键字
加锁
八大规则
1、不允许read和load、store和write操作之一单独出现<br>read与load之间、store与write之间是可插入其他指令的。<br>
2、不允许一个线程丢弃它的最近的assign操作<br>
3、不允许一个线程无原因地(没有发生过任何assign操作)把数据从线程的工作内存同步回主内存中
4、对一个变量实施use和store操作之前,必须先执行过了assign和load操作
5、一个变量在同一个时刻只允许一条线程对其执行lock操作<br>但lock操作可以被同一个条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。
6、如果对一个变量执行lock操作,将会清空工作内存中此变量的值<br>在执行引擎使用这个变量前,需要重新执行load或assign操作初始化变量的值。
7、如果一个变量实现没有被lock操作锁定,则不允许对它执行unlock操作,也不允许去unlock一个被其他线程锁定的变量。
8、对一个变量执行unlock操作之前,必须先把此变量同步回主内存(执行store和write操作)
八种原子操作
1、lock(加锁)
2、read(从主存读取到工作内存)
3、load(从工作内存加载)
4、use(使用)
5、asign(赋值)
6、store(把修改的值存入工作内存)
子主题
5、asign(赋值)
6、store(把修改的值存入工作内存)
7、write(把修改的值从工作内存写入主存)
8、unlock(解锁)
重排序
一个java文件从源代码到执行过程
1、源代码<br>
2、编译器优化重排序
3、指令并行重排序
4、内存系统重排序
5、最终序列
as-if-seria规则
不论怎么重排序,程序运行的结果不变
happens-before规则
在不改变程序运行结果的前提下,尽可能的提高并行度<br>
重排序的影响
好处:可以提高程序并行度,加快程序运行速度
坏处:对于并发的情况,存在数据依赖的程序可能会造成结果改变
双重检查锁定
并发时可能会出现返回了一个空的对象的情况<br>因为重排序可能先把对象引用指向了内存空间,但是还未初始化该对象
解决方法:加volatile修饰词,防止指令重排序
三、并发基础
线程
可以通过setPriority(int)方法来修改优先级,默认优先级是5<br>
可以通过jps查看当前线程状态
线程状态
NEW(还未执行start方法)
RUNNABLE(运行中,运行和就绪都称为运行中)
BLOCKED(阻塞状态)<br>
WAITING(等待状态)<br>
TIME-WAITING(超时等待,一定时间内未执行就不再执行)
TERMINATED(终止)<br>
DAEMON(守护线程)<br>
Thread.setDaemon(true)将线程设置为Daemon线程。<br>
当程序不存在非守护线程的线程,程序结束
一般被用来作为日志线程和GC线程
线程休眠
Thread.sleep(1000)
暂停线程时它不会释放锁,该方法会抛出InterrupttedException异常(如果有线程中断了当前线程)
⭐TimeUnit.SECONED.sleep(1)
提高了可读性,更推荐
Thread,join();//必须等待此线程执行完成再执行别的线程
多线程实现的三种方式
1、继承Thread类,重写run()方法,再通过new 类()获得线程
2、实现Runnable接口,重写run()方法,再通过new Thread(实现类对象)获得线程
优点
比继承Thread类更加灵活,因为java类是单继承的
3、实现Callable接口 ,重写call()方法<br>创建FutureTask对象,把自定义类作为参数传进去<br>new Thread(FutureTask对象)
优点
1、可以通过FutureTask.get()获得运行结果(Runnable接口不可以)
2、可以向上抛出异常(Runnable接口不可以)
lambda表达式
定义:Lambda 表达式是 JDK8 的一个新特性,可以取代大部分的匿名内部类,写出更优雅的 Java 代码<br>Lambda 规定接口中只能有一个需要被实现的方法,不是规定接口中只能有一个方法<br>
@FunctionalInterface:修饰函数式接口的,要求接口中的抽象方法只有一个。 这个注解往往会和 lambda 表达式一起出现。<br>
四种接口
函数式接口(有参有返回)<br>Function<String,String> function = (str)->{ return str+"ok"; };<br>
判定型接口(有参返回布尔值)<br>Predicate<String> predicate = (Str)->{ return Str.isEmpty(); };<br>
供给型接口(无参有返回)<br>Supplier<Integer> supplier = ()->{ return 1024; };<br>
消费型接口(有参无返回)<br>Consumer<String> consumer = (str)->{ System.out.println(str); };<br>
四、锁
对象头
数组
3个字宽
MarkWord
引用地址指针
数组长度
非数组
2个字宽
MarkWord
引用地址指针
锁的状态
无锁状态
偏向锁状态
偏向锁在jdk6,7默认开启,但是有延迟<br>可以通过参数关闭:-XX:BiasedLockingStartupDelay=0<br>
如果程序中线程经常竞争资源,可以通过参数关闭偏向锁<br>-XX:-UseBiasedLocking=false(关闭后默认进入轻量级锁)<br>
加锁:将对象头的MarkWord设置成当前线程
解锁:将对象头的MarkWord记录当前线程的信息置0
优点:加锁解锁不需要额外消耗,速度块
缺点:如果存在锁竞争,锁撤销会带来额外的消耗<br>
适用场景:只有一个线程访问同步块<br>
轻量级锁状态
加锁:把对象头的MarkWod复制到锁记录(DisplacedMarkWord)<br>将对象头的MarkWord替换为指向锁记录的指针(CAS原子操作)<br>
成功:获得锁
失败:自旋尝试获得锁
解锁:把对象头的锁记录指针题换成锁记录存储的MarkWord(CAS原子操作)
成功:解锁成功
失败:有锁竞争,锁膨胀升级成重量级锁
一旦锁升级成重量级锁后不会退回轻量级锁,线程争夺资源时不会自旋,而是阻塞<br>
优点:线程不会阻塞,提高相应速度<br>
缺点:自旋消耗CPU
适用场景:追求响应速度,同步块执行非常快<br>
重量级锁状态
优点:锁竞争不会自旋,不消耗CPU<br>
缺点:线程阻塞,响应时间长
适用场景:追求吞吐量,同步块执行时间长
消费者-生产者问题
1、synchronized修饰方法<br>
2、Lock锁版本
private int number = 0;<br><font color="#f15a23"><b> private Lock lock = new ReentrantLock();<br> private Condition condition = lock.newCondition();</b></font><br><br> //生产<br> public void increase() throws InterruptedException {<br> <b><font color="#f15a23">lock.lock();</font></b><br> try{<br> while(number > 0){ //这里不用if而用while是为了虚假唤醒,因为if只会判断一次,while是循环多次判断<br> <b> <font color="#f15a23">condition.await();</font></b><br> }<br> System.out.println(Thread.currentThread().getName() + " ==> " + ++number);<br> <b><font color="#f15a23">condition.signalAll();</font></b><br> }finally {<br> <b><font color="#f15a23">lock.unlock();</font></b><br> }<br> }<br><br> //消费<br> public void decrease() throws InterruptedException {<br> <b><font color="#f15a23">lock.lock();</font></b><br> try{<br> while(number == 0){ //这里不用if而用while是为了虚假唤醒,因为if只会判断一次,while是循环多次判断<br> <b><font color="#f15a23">condition.await();</font></b><br> }<br> System.out.println(Thread.currentThread().getName() + " ==> " + --number);<br> <b><font color="#f15a23">condition.signalAll();</font></b><br> }finally {<br> <b><font color="#f15a23"> lock.unlock();</font></b><br> }<br> }<br>
3、Lock锁的升级版本,指定唤醒<br>
condition0.await();//该监视器监视方法阻塞<br>condition1.signal();//唤醒condition1监视器监视的方法<br>
4、Semaphore版本
Semaphore semaphore = new Semaphore(3);//资源数量
semaphore.acquire(); //获取信号量,信号量-1
semaphore.release(); //释放信号量,信号量+1
AQS(AbstractQueuedSynchronized,队列同步器)<br>
volatile int state; 标志
CLH队列实现线程存储
将竞争资源失败的线程放在队列尾部,寻找安全点后park()休息,等待被唤醒
五、并发容器和框架
ConcurrentHashMap
为什么不使用HashMap?
HashMap是线程不安全的
1、多线程并发遇到扩容时会遇到HashMap成循环链表的情况<br>成环原因时jdk1.7中扩容使用的是头插法来加快扩容时间,jdk1.8已经改成了尾插法<br>
2、多线程并发时可能存在多个线程插值位置相同,导致先插入的值被覆盖<br>
为什么不使用Hashtable?
Hashtable的效率低<br>
1、Hashtable的每个方法都用Synchronized修饰,执行效率低
2、Hashtable的插入操作使用的是对象的hash值取余数,效率低<br>ConcurrentHashMap底层是HashMap,使用的是位运算,效率比取余高很多
为什么要使用ConcurrentHashMap?
1、jdk1.7时,Segment继承了ReentrantLock,具备锁和释放锁的功能。(分段锁)<br>ConcurrentHashMap只有16个Segment,并且不会扩容,最多可以支持16个线程并发写。
2、JDK1.8时,放弃了锁分段的做法,采用CAS和synchronized方式处理并发。<br>以put操作为例,CAS方式确定key的数组下标,synchronized保证链表节点的同步效果。
为什么放弃分段锁?<br>
1、减少内存开销<br>假设使用可重入锁,每个节点都要继承AQS,这无疑消耗巨大内存,但并不是每个节点都需要同步支持,只有链表的头节点/红黑树的根节点需要同步
2、获得JVM的支持<br>可重入锁毕竟是API级别的,后续的性能优化空间很小。synchronized则是JVM直接支持的,JVM能够在运行时作出相应的优化措施,使用synchronized能够随着JDK版本的升级而不改动代码的前提下获得性能上的提升。<br>
ConcurrentLinkedQueue<br>
1、采用循环CAS实现非阻塞
2、节点是Node类型
3、空参构造方法
head=tail=null
4、其他集合为参数的构造方法
1、先head=tail=null
2、遍历参数集合元素
3、checkNotNull(e)检查节点是否为空,空则报异常<br>
4、若head=null,则head=tail=newNode<br>
5、若head != null,t.lazySetNext(newNode);<br>lazySetNext方法调用的是native方法<br>
5、入队操作
1、调用add(e)<br>
2、add(e)调用offer(e)<br>
3、判断e是否为null,是则抛出异常
4、不为空,则循环CAS插入节点
5、tail节点不总是尾节点,插入第偶数个节点时tail节点才是尾节点<br>这么做的目的是减少循环CAS更新tail节点,加快入队,提升效率<br>
6、出队操作
1、调用poll<br>
2、当头节点不为空时,直接弹出头节点
3、当头节点为空时,弹出头节点的下一个节点,并循环CAS更新头节点<br>这么做的目的是减少循环CAS更新头节点的次数,加快出队,提升效率
阻塞队列
1、ArrayBlockingQueue<br>
基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。<br>
除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
默认情况下为非公平的。多线程并发存在资源竞争。
插入或删除元素时不会产生或销毁任何额外的对象实例
4组API
add remove element 在队列长度不满足时报异常<br>
offer poll peek 不满足条件时不报异常,返回null
put take 阻塞等待 一直等待
重载的offer和poll<br>public boolean offer(E e, long timeout, TimeUnit unit)<br>public E poll(long timeout, TimeUnit unit) throws InterruptedException<br> 时等待,重载方法在时间内未执行则跳过
2、LinkedBlockingQueue
基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE<br>如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了<br>
因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步<br>在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能
插入或删除元素时会生成一个额外的Node对象<br>
3、PriorityBlockingQueue
按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。<br>
注意,此阻塞队列为无界阻塞队列,即容量没有上限,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则最终耗尽所有的可用堆内存空间
在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
4、DelayQueue<br>
延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。<br>DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
可以用DelayQueue实现一个管理超时未响应的连接队列<br>
5. SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易
声明一个SynchronousQueue有两种不同的方式,即构造方法传入一个布尔值,默认是false,即非公平锁
来一个拿一个,只有一个存储空间
操作
synchronousQueue.put(1);
synchronousQueue.take();<br>
Fork/Join框架
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干<br>个小任务,最终汇总每个小任务结果后得到大任务结果的框架<br>⭐核心思想:先分支,再合并
使用工作窃取算法<br>
解释:任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。<br>当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务
优点::充分利用线程进行并行计算,减少了线程间的竞争。
缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并<br>且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
使用ForkJoin框架
通常情况下我们不需要直接集成ForkJoinTask类,只需要继承它的子类
Fork/Join框架提供了两个子类
1、RecursiveAction:用于没有返回结果的任务<br>
2、RecursiveTask:用于有返回结果的任务<br>
ForkJoinTask需要通过ForkJoinPool来执行
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,<br>ForkJoinTask数组负责将存放程序提交给ForkJoinPool,而ForkJoinWorkerThread负责执行这些任务
使用示例
1、继承RecursiveTask类
2、重写compute方法
1、判断任务大小决定是否分割任务
2、分割任务
ForkJoinDemo task1 = new ForkJoinDemo(start,mid);<br> ForkJoinDemo task2 = new ForkJoinDemo(mid+1,end);
3、分支调用
task1.fork();<br> task2.fork();
4、分支结果合并
task1.join() + task2.join()
3、创建ForkJoin池<br>ForkJoinPool forkJoinPool = new ForkJoinPool();
4、创建测试类<br>ForkJoinDemo forkJoinDemo = new ForkJoinDemo(0,1000000000);<br>
5、把测试类对象放在ForkJoin池内执行<br>forkJoinPool.execute(forkJoinDemo);<br>
六、原子操作类
Atomic包
基本类型
AtomicBoolean:原子更新布尔类型<br>
AtomicInteger:原子更新整型
int addAndGet(int n)原子相加<br>
boolean compareAndSet(int expect,int update)CAS方式更新值<br>
int getAndIncrement() 原子方式自增
int getAndSet(int newValue):以原子方式设置为newValue的值,并返回旧值。
void lazySet(int newValue):最终会设置成newValue,使用lazySet设置值后,可能导致其他<br>线程在之后的一小段时间内还是可以读到旧的值
AtomicLong:原子更新长整型
数组类型
AtomicIntegerArray:原子更新整型数组里的元素。<br>
AtomicLongArray:原子更新长整型数组里的元素。<br>
AtomicReferenceArray:原子更新引用类型数组里的元素
AtomicIntegerArray类主要是提供原子的方式更新数组里的整型
引用类型
AtomicReference:原子更新引用类型。
AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
七、并发工具包
CountDownLatch(减法计数器)<br>
1、初始化<br>
CountDownLatch countDownLatch = new CountDownLatch(6);
2、计数器减一操作<br>
countDownLatch.countDown();
3、等待计数器值为0 再向下执行<br>
countDownLatch.await();<br>
CyclicBarrier(加法计数器)<br>
1、初始化<br>
CyclicBarrier cyclicBarrier = new CyclicBarrier(n, () -> {});
到n再执行lambda表达式的逻辑
2、计数器到达n时会调起cyclicbarrier构造方法中的方法<br>
cyclicBarrier.await();
Semaphore(限流,ps:抢车位)
Semaphore semaphore = new Semaphore(3);//资源数量
semaphore.acquire(); //获取信号量,信号量-1
semaphore.release(); //释放信号量,信号量+1
Exchanger(交换者)
Exchanger用于进行线程间的数据交换
应用场景
遗传算法
结果核对
两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也<br>执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方
八、线程池⭐
用处:减少线程创建销毁的性能开销
通过pool.excute(()->{})执行,相当于new Thread(()->{})<br>
线程池必须关闭,可以放在finally块中
pool.shutdown();
三大方法
1、ExecutorService pool = Executors.newSingleThreadExecutor(); //单一线程的线程池<br>
2、ExecutorService pool = Executors.newFixedThreadPool(5); //固定线程的线程池
3、ExecutorService pool = Executors.newCachedThreadPool(); //可变线程,数量可伸缩的线程池
七大参数
1、int corePoolSize//核心线程数---正在工作的窗口<br>
2、int maximumPoolSize//最大线程数---一共有多少窗口
3、long keepAliveTime//线程存活时间---当候客区持续这段时间没人则关闭线程
4、TimeUnit unit//存活的时间单位
5、BlockingQueue<Runnable> workQueue//阻塞队列---new LinkedBlockingDeque<>(n)(候客区)
6、ThreadFactory threadFactory//线程创建工厂类---Executors.defaultThreadFactory()不需要修改<br>
7、RejectedExecutionHandler handler//拒绝策略
四大拒绝策略
1、new ThreadPoolExecutor.CallerRunsPolicy();//报异常
2、new ThreadPoolExecutor.DiscardPolicy();//丢弃任务,但是不抛出异常。
3、new ThreadPoolExecutor.DiscardOldestPolicy();//让最早的线程停止任务,所有任务竞争,最后来的任务拿到线程就执行,否则就不执行
4、new ThreadPoolExecutor.CallerRunsPolicy();//由调用线程(提交任务的线程)处理该任务(一般返回给main线程)<br>
自定义线程池
九、Executor框架
两级调度模型
在上层,程序通常把应用分解为若干个任务,用户级的调度器(Executor框架)将这些任务映射为固定数量的线程
在底层,操作系统内核将这些线程映射到硬件处理器上
Executor框架最核心的类是ThreadPoolExecutor<br>
三种ThreadPoolExecutor<br>
Executors.newSingleThreadExecutor(); //单一线程的线程池
Executors.newFixedThreadPool(5); //固定线程的线程池<br>
Executors.newCachedThreadPool(); //可变线程,数量可伸缩的线程池
如何调用?
1、ExecutorService pool = Executors.newCachedThreadPool();<br>
2、pool.execute(()->{System.out.println(Thread.currentThread().getName() + " ok");});<br> ⭐类似new Thread(()->{});<br>
3、pool.shutdown(); //用完必须关闭,最好放在finally块中
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor
它主要用来在给定的延迟之后运行任务,或者定期执行任务
FutureTask
FutureTask基于AQS实现<br>
FutureTask提供了对Future的基本实现
FutureTask还实现了Runnable接口
FutureTask交由Executor执行
也可以直接用线程调用执行(futureTask.run())
0 条评论
下一页