并发编程专题
2026-01-06 10:12:25 1 举报
AI智能生成
本文档将以详细阐述并发编程的重要性和核心概念为中心,其内容包括但不限于进程间通信、线程同步、死锁解决和高性能并发模式的设计与实现。文件采用PDF格式进行描述和分发,确保良好的跨平台兼容性和易读性。文档中将重点使用明了的语言和示例,以求简洁而精确地传达并发编程的复杂性,并使读者能够在实际开发中游刃有余地运用所学知识。旨在提高读者对于并发控制的深刻理解,促进在开发高性能及可扩展软件系统方面的能力。
作者其他创作
大纲/内容
java线程创建
继承Thread
没有返回结果
实现Runnable
没有返回结果
Callable
Callable的call方法可以有返回值,可以声明抛出异常。
配合有Future 类
Future 接口的API
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
boolean cancel (boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等
任务结束
任务结束
boolean isCancelled () 任务是否已经取消,任务正常完成前将其取消,则返回 true
boolean isDone () 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
V get () throws InterruptedException, ExecutionException 等待任务执行结束,然后获得V类型的结果。
InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出
CancellationException
CancellationException
V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException
局限性
Future表示一个异步计算的结果
并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。
无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供
这样的能力;
这样的能力;
无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是
无能为力的;
无能为力的;
没有异常处理:Future接口中没有关于异常处理的方法
FutureTask 使用(future的具体实现类)
CompletableFuture
CompletableFuture是Future接口的扩展和增强
串行[依赖]、并行、聚合的关系
CompletableFuture实现了对任务的编排能力
应用场景
描述依赖关系
thenApply() 把前面异步任务的结果,交给后面的Function
thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回
thenApply 和 thenCompose的区别
thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture;
thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个CompletableFutre 调用的结果在下一步的CompletableFuture 调用中进行运算,是生成一个新的CompletableFuture。
描述and聚合关系
thenCombine:任务合并,有返回值
thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。
描述or聚合关系
applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。
. acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。
并行执行
anyOf()和allOf()用于支持多个CompletableFuture并行执行
创建异步操作
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
runAsync 方法以Runnable函数式接口类型为参数,没有返回结果
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor
executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor
executor)
Supplier 接口的 get() 方法是有返回值的(会阻塞)
获取结果
join()
join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出
get()
get()方法抛出的是经过检查的异常,ExecutionException,
InterruptedException 需要用户手动处理(抛出或者 try catch)
InterruptedException 需要用户手动处理(抛出或者 try catch)
结果处理
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable>
action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable>
action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable>
action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable>
action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable>
action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
结果转换
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
Executor executor)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
Executor executor)
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends
CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends
CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends
CompletionStage<U>> fn, Executor executor) ;
CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends
CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends
CompletionStage<U>> fn, Executor executor) ;
结果消费
结果处理和结果转换系列函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行Action,而不返回新的计算值。
thenAccept系列:对单个结果进行消费
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor
executor);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor
executor);
thenAcceptBoth系列:对两个结果进行消费
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U>
other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U>other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U>other,BiConsumer<? super T, ? super U> action, Executor executor);
other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U>other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U>other,BiConsumer<? super T, ? super U> action, Executor executor);
当两个 CompletionStage 都正常完成计算的时候,就会执行提供的action消费两个异步的结果
thenRun系列:不关心结果,只对结果执行Action
thenRun 会在上一阶段CompletableFuture 计算完成的时候执行一个Runnable,Runnable并不使用该 CompletableFuture 计算的结果。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
ThreadLocal
描述
ThreadLocal类用来提供线程内部的局部变量
特点
线程安全: 在多线程并发的场景下保证线程安全
传递数据: 我们可以通过ThreadLocal在同一线程,不同组件中传递公共变量
线程隔离: 每个线程的变量都是独立的,不会互相影响
基本用法
ThreadLocal()
创建ThreadLocal对象
public void set( T value)
设置当前线程绑定的局部变量
public T get()
获取当前线程绑定的局部变量
public void remove()
移除当前线程绑定的局部变量
与synchronized的区别
synchronized
同步机制采用'以时间换空间'的
方式, 只提供了一份变量,让不同
的线程排队访问
方式, 只提供了一份变量,让不同
的线程排队访问
多个线程之间访问资源的同步
ThreadLocal
ThreadLocal采用'以空间换时
间'的方式, 为每一个线程都提供
了一份变量的副本,从而实现同
时访问而相不干扰
间'的方式, 为每一个线程都提供
了一份变量的副本,从而实现同
时访问而相不干扰
多线程中让每个线程之间的数据
相互隔离
相互隔离
优势
传递数据 : 保存每个线程绑定的数据,在需要的地方可以直接获取, 避免参数直接传递带来的代码耦合问题
线程隔离 : 各线程之间的数据相互隔离却又具备并发性,避免同步方式带来的性能损失
具体设计思路
每个Thread线程内部都有一个Map (ThreadLocalMap)
Map里面存储ThreadLocal对象(key)和线程的变量副本(value)
Thread内部的Map是由ThreadLocal维护的,由ThreadLocal负责向map获取和设置线程的变量值。
对于不同的线程,每次获取副本值时,别的线程并不能获取到当前线程的副本值,形成了副本的隔离,互不干
扰。
扰。
ThreadLocalMap
Entry继承WeakReference,也就是key(ThreadLocal)是弱引用,其目的是将ThreadLocal对象的生命周期和线程生命周期解绑。
出现内存泄漏的真实原因
没有手动删除这个Entry
CurrentThread依然运行
ThreadLocal内存泄漏的根源是:由于ThreadLocalMap的生命周期跟Thread一样长,如果没有手动删除对应key就会导致内存泄漏
如何避免
使用完ThreadLocal,调用其remove方法删除对应的Entry
使用完ThreadLocal,当前Thread也随之运行结束
hash冲突的解决
初始化ThreadLocalMap
定义了一个AtomicInteger类型,每次获取当前值并加上HASH_INCREMENT,HASH_INCREMENT= 0x61c88647,这个值跟斐波那契数列(黄金分割数)有关,其主要目的就是为了让哈希码能均匀的分布在2的n次方的数组里, 也就是Entry[] table中,这样做可以尽量避免hash冲突。
ThreadLocalMap使用线性探测法来解决哈希冲突的
cas与atomic原子类
什么是cas?
CAS(Compare And Swap,比较与交换),是非阻塞同步的实现原理,它是CPU硬件层面的一种指令
x86架构,IA-64架构Atomic::cmpxchg指令
CAS缺陷
自旋 CAS 长时间不成功,则会给 CPU 带来非常大的开销
只能保证一个共享变量原子操作
ABA 问题
atomic原子类
基本类型
AtomicInteger、AtomicLong、AtomicBoolean
引用类型
AtomicReference、AtomicStampedRerence、AtomicMarkableReference;
数组类型
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
对象属性原子修改器
AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、
AtomicReferenceFieldUpdater
AtomicReferenceFieldUpdater
原子类型累加器(jdk1.8增加的类)
DoubleAccumulator、DoubleAdder、LongAccumulator、
LongAdder、Striped64
LongAdder、Striped64
抽象队列同步器AQS
管程
同步的设计思想
管程:指的是管理共享变量以及对共享变量的操作过程,让他们支持并发
互斥:同一时刻只允许一个线程访问共享资源
同步:线程之间如何通信、协作
管程模型
MESA模型
目前主流
管程中引入了条件变量的概念,而且每个条件变量都对应有一个等待队列。条件变量和等待队列的作用是解决线程之间的同步问题
Hasen模型
Hoare模型
Java中针对管程有两种实现
一种是基于Object的Monitor机制,用于synchronized内置锁的实现
一种是抽象队列同步器AQS,用于JUC包下Lock锁机制的实现
一般是通过一个内部类Sync继承 AQS
将同步器所有调用都映射到Sync对应的方法
特性
阻塞等待队列
共享/独占
公平/非公平
可重入
允许中断
核心结构
AQS内部维护属性volatile int state
state表示资源的可用状态
三种访问方式
getState()
setState()
compareAndSetState()
定义了两种资源访问方式
Exclusive-独占,只有一个线程能执行,如ReentrantLock
Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS实现时主要方法
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示
成功,且有剩余资源。
成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回
false。
false。
两种队列
CLH同步等待队列: 主要用于维护获取锁失败时入队的线程。
Java中的CLH队列是原CLH队列的一个
变种,线程由原自旋机制改为阻塞机制。
变种,线程由原自旋机制改为阻塞机制。
条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁。
用单向列表保存的,用nextWaiter来连接
waitstatus五种节点状态
值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁
CANCELLED,值为1,表示当前的线程被取消
SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
具体实现子类
ReentrantLock
ReentrantLock是一种基于AQS框架的应用实现
线程池
是一种基于池化思想管理线程的工具
优势
降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
提高响应速度:任务到达时,无需等待线程创建即可立即执行
提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行
创建方式
ThreadPoolExecutor
corePoolSize:核心线程数,线程池初始化时默认是没有线程的,当任务来临时才开始创建线程去执行任务
maximumPoolSize:最大线程数,在核心线程数已满,且队列已满时,如果池子里的工作线程数小于
maximumPoolSize,则会创建非核心线程执行任务
maximumPoolSize,则会创建非核心线程执行任务
keepAliveTime:非核心线程数的空闲时间超过keepAliveTime就会被自动终止回收掉,但在
corePoolSize=maximumPoolSize时,该值无效,因为不存在非核心线程
corePoolSize=maximumPoolSize时,该值无效,因为不存在非核心线程
unit:keepAliveTime的时间单位
workQueue:用于保存线程任务的队列,主要分为无界、有界、同步移交等队列,当池子里的工作线
程数大于corePoolSize,就会将新进来的线程任务放入队列中
程数大于corePoolSize,就会将新进来的线程任务放入队列中
ArrayBlockingQueue(有界队列):队列长度有限,当队列满了就需要创建非核心线程执行任务,如果最大线程数
已满,则执行拒绝策略
已满,则执行拒绝策略
LinkedBlockingQueue(无界队列):队列长度无限,当任务处理速度跟不上任务创建速度,可能会导致内存占用过
多或OOM
多或OOM
SynchronousQueue(同步队列):队列不作为任务的缓冲处理,队列长度为0
threadFactory
创建线程的工厂接口,默认使用Executors.defaultThreadFactory()
另外可以实现ThreadFactory接口,自定义线程工厂
handler:线程池无法继续接收任务时(workQueue已满和maximumPoolSize已满)的拒绝策略
AbortPolicy:默认拒绝策略,中断抛出RejectedExecutionException异常
CallerRunsPolicy:让提交任务的主线程来执行任务
DiscardOldestPolicy:丢弃在队列中存在时间最久的任务,重复执行
DiscardPolicy:丢弃任务,不进行任何通知
实现RejectedExecutionHandler接口,自定义拒绝策略
Executors
不建议使用:因为线程可以无限创建,要么就是队列是无界队列
五种状态
RUNNING:会接收新任务并且会处理队列中的任务
SHUTDOWN:不会接收新任务并且会处理队列中的任务
STOP:不会接收新任务并且不会处理队列中的任务,并且会中断在处理的任务(注意:一个任务能不能被中断得看任务本身)
TIDYING:所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状
态,就会调用线程池的terminated()
态,就会调用线程池的terminated()
TERMINATED:terminated()执行完之后就会转变为TERMINATED
线程池为什么一定得是阻塞队列?
为非核心线程会poll(),核心线程会take(),非核心线程超过时间还没获取到任务后面就会自然消亡了
线程发生异常,会被移出线程池吗?
会,但是如果线程池还在运行状态,会新建一个工作线程
0 条评论
下一页