锁升级
锁升级过程
偏向锁
第一个线程拿到锁,将自己的线程信息标记在锁上,下次进来就不需要在拿去拿锁验证了。<br>如果超过1个线程去抢锁,那么偏向锁就会撤销,升级为轻量级锁,偏向锁并不算一把真正的锁。<br>
syn为重量级锁
JVM偷懒把任何跟线程有关的操作全部交给操作系统去做,例如调度锁的同步直接交给操作系统去执行,<br>而在操作系统中要执行先要入队,另外操作系统启动一个线程时需要消耗很多资源,<br>消耗资源比较重,重就重在这里。<br>
CAS
乐观锁&悲观锁
读取频繁使用乐观锁(CAS/版本号机制),写入频繁使用悲观锁。
实现过程
compare and swap,比较并交换
内存位置(V)、预期原值或者叫期望值(A)和新值(B)。 <br>如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。<br>否则,处理器不做任何操作。<br>
原理
真实的CAS操作是由CPU完成的,CPU会确保这个操作的原子性。CAS是CPU指令级别的操作,中间不能被打断,是靠CPU原语实现的。
Java类包
java.util.concurrent.atomic
缺陷
1、自旋时间长
线程在长时间内持有锁,等待竞争锁的线程一直自旋,即CPU一直空转,资源浪费在毫无意义的地方
限制自旋次数
2、只能保证一个共享变量原子操作
3、ABA问题
一个值原来是A,变成了B,然后又变成了A,在compare 的时候会发现没有被修改,基本数据类型无所谓
如果有两个线程t1和t2,一个对象A,而对象A中有各种属性甚至是引用了其他对象。<br>t1先引用了A,然后把A中的属性或者其他引用给更改了,而t2再去引用A的时候,<br>只是对比内存值,也就是A的引用地址,会发现没有任何改变。<br>
加上版本号
AtomicStampedReference
ThreadLocal
存储每个线程的私有数据
访问这个变量的每个线程都会有这个变量的本地副本
1、每个Thread中都具备一个ThreadLocalMap,而ThreadLocalMap可以存储以ThreadLocal为 key ,Object 对象为 value 的键值对。如果一个线程多个ThreadLocal的话,也是在同一个ThreadLocalMap中存储。<br>
2、Entry继承的弱引用,弱引用,一旦GC,则一定回收。<br>如果Entry是强引用的话,如果使用完ThreadLocal对象了,GC这时候开始回收了,它先回收了thread当中的ThreadLocal的引用(因为不用了),在继续删除ThreadLocal的实际对象时,发现被ThreadLocalMap中的key强引用着,这时候就不会回收(只要线程运行,就永远被引用),多了就造成内存泄露。<br>
3、由于ThreadLocalMap 的生命周期跟 Thread 一样长,对于重复利用的线程来说,如果没有手动删除(remove()方法)对应 key 就会导致entry(null,value)的对象越来越多,从而导致内存泄漏。<br>因为threadLocal是弱引用的,如果threadLocal被回收了,线程还在运行,这时候key为null,value是有值的,且无法访问到,堆积多的话也会导致内存泄露。这时候就得调用remove方法了<br>
应用场景
1、每个线程需要有自己单独的实例<br>2、实例需要在多个方法中共享,但不希望被多线程共享
在Spring的@Transaction事务声明的注解中就使用ThreadLocal保存了当前的Connection对象,<br>避免在本次调用的不同方法中使用不同的Connection对象。<br>
可以尝试使用ThreadLocal替代Session的使用,当用户要访问需要授权的接口的时候,<br>可以在拦截器中将用户的Token存入ThreadLocal中;<br>之后在本次访问中任何需要用户信息的操作都可以直接从ThreadLocal中拿取数据。<br>
解决线程安全问题
并发容器
ConcurrentHashMap
Node 数组 + 链表 / 红黑树,同时它采用的Synchronized 锁加CAS的机制,引用了锁升级的策略
在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。
CopyOnWriteArrayList
线程安全且读操作无锁的ArrayList
直接就不给读操作加锁,而且:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。
复制旧内存块,改写新内存,然后指针指向新内存,回收就内存。
ConcurrentLinkedQueue
非阻塞队列,主要使用 CAS 非阻塞算法来实现线程安全。
BlockingQueue
阻塞队列。 BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?<br>是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。<br>
线程池
Executor框架
调度任务进行执行,并得出异步计算的结果。
1、任务
被执行任务需要实现的接口,Runnable接口或者Callable接口。
2、任务的提交和执行
包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。<br>execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;<br>ExecutorService接口中提供了submit()方法,用于提交需要返回值的任务。<br>
3、任务的结果(异步计算)
包括接口Future和实现Future接口的FutureTask类。
4、常见对比
Runnable vs Callable
前者无返回值和抛异常,后者有。
execute vs submit
前者执行不需要返回值的任务,后者执行需要返回值的任务,返回值为Future。
shutdown vs shutdownNow
新任务停止,已存在的要执行完VS所有都停止
isTerminated vs isShutdown
调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true。<br>立即返回true
ThreadPoolExecutor
构造方法
1、corePoolSize 核心线程数
2、maximumPoolSize 最大线程数,队列满了,就新增线程,最大可新增的数量
3、workQueue 任务队列
4、keepAliveTime
没有任务执行的非核心线程的销毁时间
5、unit 时间单位
6、threadFactory 线程工厂
7、handler 拒绝策略
AbortPolicy 抛异常拒绝--默认
子主题
DiscardPolicy 不处理,直接拒绝
DiscardOldestPolicy 丢掉最早的未处理的任务
CallerRunsPolicy 直接在调用execute方法的线程中运行(主线程)被拒绝的任务
FixedThreadPool 指定线程数
因为maximumPoolSize无效,而LinkedBlockingQueue队列的最大值是 Integer.MAX_VALUE,<br>运行中的线程池会一直接受任务,直到队列满了还会接受,极端情况下会造成OOM。<br>
SingleThreadExecutor 单个线程
同上,会造成OOM
CachedThreadPool 线程数最大化
允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
ScheduledThreadPoolExecutor 定时执行
允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。而且在实际项目中应用较少,了解即可。
线程的上下文切换
任务从保存到再加载的过程就是一次上下文切换。
当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。
ThreadPoolExecutor源码
1、主要属性--构造函数
2、线程状态
private static final int RUNNING = -1 << COUNT_BITS;<br> private static final int SHUTDOWN = 0 << COUNT_BITS;<br> private static final int STOP = 1 << COUNT_BITS;<br> private static final int TIDYING = 2 << COUNT_BITS;<br> private static final int TERMINATED = 3 << COUNT_BITS;<br>
通过计算,获取到运行状态、工作线程数和ctl
状态跃迁图
3、execute方法
execute方法是ThreadPoolExecutor异步执行任务的方法,用来提交一个任务到线程池中去。<br>其中,ExecutorService接口中提供了submit()方法,最终也是调用的execute()方法。
执行流程
详细流程
4、addWorker方法
创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。
1、死循环中,非运行状态,且SHUTDOWN下传入了新任务,且任务队列已经空了,不创建新线程。<br>2、死循环中,一系列判断后,通过CAS更新工作线程数wc+1,并break结束循环。<br>3、加全局锁,创建线程并启动线程。只有启动成功线程,才算添加好woker对象了,否则只是一个无用的临时对象。<br>
5、Worker内部类
Worker构造里面会通过线程工厂创建新的Thread对象
实现了AQS和Runnable
通过ThreadFactory创建的Thread实例同时传入Worker实例,因为Worker本身实现了Runnable,所以可以作为任务提交到线程中执行。<br>只要thread 执行了start()方法,就能够执行Worker中的run()方法。<br>Worker继承自AQS,其中也用到了模板方法模式,重写了获取资源和释放资源的方法。<br>
AbstractQueuedSynchronizer
ReentrantLock
支持公平锁和非公平锁、可重入性,默认非公平锁
NonfairSync.lock()
compareAndSetState()
1、通过compareAndSetState也就是CAS设置锁的状态(stateOffset),如果锁状态设为1成功,<br>那就获取到了锁,就调用setExclusiveOwnerThread(Thread.currentThread());将当前线程设为独占线程。<br>
setExclusiveOwnerThread()
acquire()
2、如果上锁失败,就调用 acquire(1)方法。<br>
FairSync.lock()
acquire()
会让线程进入一个等待队列,然后去获得锁
核心
核心是volatile修饰的state和等待队列。
当一个线程过来的时候,如果被请求的共享资源是空闲的,那么就将此线程设置为工作线程,将共享资源设置为锁定状态。<br>如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配,将暂时获取不到锁的线程加入到队列中。<br>
虚拟双向队列(FIFO)
AQS 是通过将请求共享资源的每条线程封装成一个节点 Node来实现锁的分配
对于waitStatus,分为0,1,-1,-2,-3五种值,负值表示结点处于有效等待状态,而正值表示结点已被取消。<br>所以源码中很多地方用>0、<0来判断结点的状态是否正常。<br>
独占锁(ReentrantLock)
state
state表示共享资源的状态,即同步状态。
final修饰,不能被重写,修改state的状态,来设置同步,也就是加锁解锁。
ReentrantLock非公平锁详细加锁流程
1、ReentrantLock.lock()
2、NonfairSync.lock()
默认是非公平锁,<br>非公平获取不到锁后才转到公平锁<br>
3、compareAndSetState(0, 1)
CAS设置锁的状态,如果锁状态设为1成功,那就获取到了锁
4、setExclusiveOwnerThread
CAS成功后,设置线程为独占线程
5、acquire(1)
CAS失败,调用acquire(1)方法,尝试再次获取锁,<br>或者进入等待队列<br>
6、tryAcquire()<br>重写的AQS的方法
nonfairTryAcquire()<br>1、再次CAS尝试获取资源<br>2、判断是否为可重入锁
7、addWaiter()<br>
在双端链表添加尾节点的操作,<br>需要注意的是,双端链表的头结点是一个无参构造函数的头结点<br>
8、acquireQueued()<br>线程出队列
进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,<br>自己再拿到资源,然后进行工作。<br>
ReentrantLock公平锁详细加锁流程
1、FairSync.lock()
2、直接acquire(1)<br>和非公平锁的区别,先去重写tryAcquire()<br>/加入队列<br>
3、tryAcquire()<br>重写的AQS的方法
和非公平锁的区别,增加hasQueuedPredecessors()判断
hasQueuedPredecessors()判断,判断当前线程是否是队列中第一个有效的线程
4、addWaiter()
5、acquireQueued()<br>线程出队列<br>
详细acquireQueued()
1、结点进入队尾后,检查状态,找到park()点;<br>2、调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;<br>3、被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,<br>并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。<br>
4、cancelAcquire()<br>如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),取消结点在队列中的等待。<br>
解锁流程
1、ReentrantLock.unlock()
2、AQS.release()
3、tryRelease()
当前锁是不是没有被线程持有
4、setExclusiveOwnerThread(null)
如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
5、unparkSuccessor
把下一个不为空的节点unpark
下个节点是null或者下个节点被cancelled,就从后往前找到队列最开始的非cancelled的节点
如果是从前往后找,由于极端情况下入队的非原子操作和 CANCELLED 节点产生过程中断开 Next<br>指针的操作,可能会导致无法遍历所有的节点。<br>由于并发问题,addWaiter()入队操作和cancelAcquire()取消排队操作都会造成next链的不一致,而prev链是强一致的,所以这时从后往前找是最安全的。<br>
流程图
共享锁
CountDownLatch
允许 int个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
1、await(): 调用该方法的线程处于等待状态,直到latch的值被减到0或者当前线程被中断。一般都是主线程调用。——开门
2、countDown():使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程。——倒数<br>
两种典型用法
1、执行完所有的业务后才执行主线程
2、某一时刻,所有线程一起执行。
利用两个CountDownLatch,一个阻塞后再执行另一个
加解锁流程<br><br>
1、await()
2、acquireSharedInterruptibly()<br>尝试获取同步状态
3、tryAcquireShared(arg)<br>当前状态是否为0可释放锁<br>
4、doAcquireSharedInterruptibly()<br>获取同步状态失败,自旋,直到可以释放锁为止<br>
5、countDown()
6、releaseShared()
7、tryReleaseShared<br>自旋,直到state状态为0<br>
8、doReleaseShared()<br>尝试唤醒同步队列中头结点的后继节点<br>
CyclicBarrier
一组线程到达一个栅栏(也可以叫同步点)时被阻塞,直到最后一个线程到达栅栏时,栅栏才会开门,所有被拦截的线程才会继续干活。
AQS其他同步组件
Semaphore
允许几个线程同时执行,灯亮执行,灯灭不执行
ReentrantReadWriteLock
线程之间读不需要独占,写需要独占以避免结果出现偏差。
Exchanger
2个线程之间交换数据,而且是双向的。一个线程调用了exchange( )方法交换数据,到达了同步点,<br>然后就会一直阻塞等待另一个线程调用exchange( )方法来交换数据。比如游戏中两个人交换装备,必须都交换才行。<br>
LockSupport
阻塞和唤醒线程
LockSupport不需要在同步代码块里,实现了线程间的解耦。<br>unpark()可以先于park调用,所以不需要担心线程间的执行的先后顺序。