java 并发编程笔记

简介

并发优势

提高多处理器的资源利用率,对于同步阻塞IO单处理器上获得更高吞吐率

多线程带来的风险

  • 安全性问题:读取value,value++,并将结果写入value,竞态条件(Race condition)
  • 活跃性问题:线程A在等待线程B释放持有资源,而线程B永不释放该资源,A一直等待
  • 性能问题:频繁的出现上下文切换操作,当线程共享数据,使用同步,抑制编译器的优化,使内存缓存区数据无效,增加共享内存总线的同步流量

线程安全性

线程安全一般体现在两个方面

  • 1、多个thread对同一个java实例的访问(read和modify)不会相互干扰;

    它主要体现在关键字synchronized。如ArrayList和Vector,HashMap和Hashtable(后者每个方法前都有synchronized关键字)。
    如果你在iterator一个List对象时,其它线程remove一个element,问题就出现了

  • 2、每个线程都有自己的字段,而不会在多个线程之间共享。

    它主要体现在java.lang.ThreadLocal类,而没有Java关键字支持,如像static、transient那样。

关键点

  • 原子性:先检查后执行操作形成一个不可分割的操作来执行
  • 原子变量:CAS(判断之前拿到的状态和现在对比,有区别就别的线程改动过,没有直接赋值,版本解决ABA问题)
  • 加锁机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    内置锁:同步代码块(sychronized block)
    修饰方法的锁就是方法调用所在的对象,静态的方法则为Class类作为锁
    修饰代码片段的得看this或者其他变量为啥(不可放可变的变量,否则锁不住)
    进入同步代码前获得所,退出同步代码释放锁

    重入:锁关联一个获取计数器和一个所有者线程,当计数器0,这个锁意味着没有被任何线程持有
    当线程请求这种未被持有类型的锁,JVM将记下锁的持有者,并且计数器+1
    如果用一个线程再次获取锁,计数值递增1,当线程退出同步代码片段,计数器相应递减,当计数器0的时候,锁被释放
    同对象调用自己的方法,如果没有重入锁,将等待锁释放,而锁为自己持有,线程一直停顿

    加锁规则
    常见的加锁约定是,将所有的可变状态都封装在对象内部,并通过对象的内部锁对所有访问可变状态的代码路径进行同步
    而不是在A访问C变量的代码片段上加锁,否则B访问C上忘记加锁容易破坏线程安全

    活跃性与性能
    缩小同步代码片段范围,这样既能做到并发性,又能做到线程安全性
  • 对象的共享

    1
    2
    3
    4
    5
    6
    7
    8
    9
    在并发程序中使用和共享对象时,可以用以下使用策略

    线程封闭:线程封闭的对象只能由一个线程拥有,对象被封闭在该线程中,并且只能由这个线程修改
    栈封闭:线程内部使用或者线程局部使用
    ThreadLocal类:这个类能够使线程中的某个值与保存的对象相关联起来

    只读共享:在没有额外同步的情况下,共享的只读对象可以由多个线程并发访问,但任何线程都不能修改它,共享只读对象包括不可变对象和事实不可变对象
    线程安全共享:线程安全的对象在内部实现同步,因此多个线程可以通过对象的公有接口来进行访问,而不需要进一步的同步
    保护对象:被保护的对象只能通过特有特定的锁来访问,保护对象包括封装在其他线程安全对象中的对象,以及已发布的并且由某个特定额外锁保护的对象
1
2
当一个对象能够给其他代码引用。即为**发布**
当一个不该发布的对象发布了,就变成了**逸出**

守护线程与非守护线程

操作系统里面是没有守护线程概念,只有守护进程一说,只有JVM平台存在
Daemon的作用是为其他线程的运行提供服务, Thread守护线程本质上来说去没啥区别的,唯一的区别之处就在虚拟机的离开:如果User Thread全部撤离,那么Daemon Thread也就退出了。

  • thread.setDaemon(true)必须在thread.start()之前设置,否则抛IllegalThreadStateException异常,不能把正在运行的常规用户线程设置为守护线程。
  • Daemon线程中产生的新线程也是Daemon的。

    又有本质的区别了:守护进程fork()出来的子进程不再是守护进程,子进程的进程的父进程不是init进程,所谓的守护进程本质上说就是”父进程挂掉,init收养,然后文件0,1,2都是/dev/null,当前目录到/“






Java内存模型

线程之间的通信机制有两种:共享内存和消息传递。

Java线程之间的通信由Java内存模型(本文简称为JMM)控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。

抽象结构

java线程之间的通信由java内存模型(JMM)控制,JMM决定一个线程对共享变量(实例域、静态域和数组)的写入何时对其它线程可见。

从抽象的角度来看JMM定义了线程和主内存Main Memory(堆内存)之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有自己的本地内存Local Memory(只是一个抽象概念物理上不存在),存储了该线程的共享变量副本。

所以线程A和线程B之前需要通信的话,必须经过一下两个步骤:

  • 1、线程A把本地内存中更新过的共享变量刷新到主内存中。
  • 2、线程B到主内存中读取线程A之前更新过的共享变量。

1
2
3
4
本地内存A和B有主内存中共享变量x的副本。
假设初始时这三个内存中的x值都为0。线程A在执行时把更新后的x值(假设值为1)临时存放在自己的本地内存A中。
当线程A和线程B需要通信时线程A首先会把自己本地内存中修改后的x值刷新到主内存中,此时主内存中的x值变为了1。
随后线程B到主内存中去读取线程A更新后的x值,此时线程B的本地内存的x值也变为了1。


Java内存模型的同步操作和规则

为了保证并发时程序处理的准确性,这里就需要一些同步的手段,这里我们介绍一下Java内存模型定义的同步的八种操作和一些规则。
八种操作

  • lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态;
  • unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定;
  • read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用;
  • load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中;
  • use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎;
  • assign(赋值): 作用于工作内存的变量,它把一个执行引擎接受到的值赋值给工作内存的变量;
  • store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write操作;
  • write(写入): 作用于主内存的变量,他把store操作从工作内存中一个变量的值传送到主内存的变量中。

规则

  • 不允许read/load,store/write单一出现且必须按顺序执行,但中间可以插入其他指令;
  • 不允许一个线程丢弃离他最近的assign操作
  • 不允许一个线程未发生assign操作就将数据同步至主线程;
  • 一个新的变量,只能从主内存中诞生,不允许在工作内存中生成一个未被初始化的变量。
  • 一个变量在同一时刻只允许一个线程执行lock操作,lock可以被同一个线程执行多次,需要相同次数的unlock操作才能解锁;
  • 如果一个变量执行了lock操作,则会清空工作内存中的值,执行引擎使用这个变量前需要重新执行load或者assign操作来拿到初始化变量的值;
  • 如果一个变量没有被lock操作执行,则不允许对其进行unlock操作,也不允许unlock一个被其他线程lock的变量,unlock操作执行之前,必须将此变量同步回主内存。

重排序&&内存屏障

1
2
3
在执行程序时,为了提高性能编译器和处理器会对指令做重排序。
但是JMM确保在不同的编译器和不同的处理器平台之上,通过插入特定类型的Memory Barrier来禁止特定类型的编译器重排序和处理器重排序
为上层提供一致的内存可见性保证。
  • 1.编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
  • 2.指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction-Level Parallelism, ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
  • 3.内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。

1属于编译器重排序,2和3属于处理器重排序

JMM把内存屏障指令分为下列四类

屏障类型 指令示例 说明
LoadLoad Barriers Load1; LoadLoad; Load2 确保Load1数据的装载,之前于Load2及所有后续装载指令的装载。
StoreStore Barriers Store1; StoreStore; Store2 确保Store1数据对其他处理器可见(刷新到内存),之前于Store2及所有后续存储指令的存储。
LoadStore Barriers Load1; LoadStore; Store2 确保Load1数据装载,之前于Store2及所有后续的存储指令刷新到内存。
StoreLoad Barriers Store1; StoreLoad; Load2 确保Store1数据对其他处理器变得可见(指刷新到内存),之前于Load2及所有后续装载指令的装载。StoreLoad Barriers会使该屏障之前的所有内存访问指令(存储和装载指令)完成之后,才执行该屏障之后的内存访问指令。
名称 代码示例 说明
写后读 a = 1;b = a; 写一个变量之后,再读这个位置。
写后写 a = 1;a = 2; 写一个变量之后,再写这个变量。
读后写 a = b;b = 1; 读一个变量之后,再写这个变量。

上面三种情况只要重排序两个操作的执行顺序,程序的执行结果将会被改变。

前面提到过编译器和处理器可能会对操作做重排序。

编译器和处理器在重排序时会遵守数据依赖性,编译器和处理器不会改变存在数据依赖关系的两个操作的执行顺序。

as-if-serial语义:不管怎么重排序(编译器和处理器为了提高并行度),(单线程)程序的执行结果不能被改变




顺序一致性内存模型

如果程序是正确同步的,程序的执行将具有顺序一致性(sequentially consistent)–即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同
这里的同步是指广义上的同步,包括对常用同步原语(lock,volatile和final)的正确使用。

顺序一致性内存模型有两大特性

  • 一个线程中的所有操作必须按照程序的顺序来执行。
  • (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中每个操作都必须原子执行且立刻对所有线程可见。

假设这两个线程使用监视器来正确同步:A线程的三个操作执行后释放监视器,随后B线程获取同一个监视器。

假设这两个线程没有做同步

在顺序一致性模型中,所有操作完全按照程序的顺序串行执行,而在JMM中临界区内的代码可以重排序




happens-before

happens-before
从jdk5开始java使用新的JSR-133内存模型,基于happens-before的概念来阐述操作之间的内存可见性。

与程序员密切相关的happens-before规则如下:

  • 1、程序顺序规则:一个线程中的每个操作,happens-before于该线程中任意的后续操作。
  • 2、监视器锁规则:对一个锁的解锁操作,happens-before于随后对这个锁的加锁操作。
  • 3、volatile域规则:对一个volatile域的写操作,happens-before于任意线程后续对这个volatile域的读。
  • 4、传递性规则:如果 A happens-before B,且 B happens-before C,那么A happens-before C。
  • 5、线程启动规则:Thread对象的start()方法先行发生于此线程的每一个动作。
  • 6、线程终止规则:线程中的所有操作都先行发生于对此线程的终止检测,我们可以通过Thread.join()方法结束,Thread.isAlive()的返回值等作段检测到线程已经终止执行。
  • 7、线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测是否有中断发生。
  • 8、对象终结规则:一个对象初始化完成(构造方法执行完成)先行发生于它的finalize()方法的开始。


as-if-serial

不管怎么重排序,单线程下的执行结果不能被改变,编译器、runtime和处理器都必须遵守as-if-serial语义。


volatile

volatile内存语义的特点

  • 可见性:基于happens-before关系(对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入)
  • 原子性:基于内存屏障(对任意一个volatile变量的读/写具有原子性)

volatile写的内存语义如下

  • 当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存。

volatile读的内存语义如下

  • 当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。

为了实现volatile内存语义,JMM会分别限制这两种类型的重排序类型

是否能重排序 第二个操作
第一个操作 普通读/写 volatile读 volatile写
普通读/写 NO
volatile读 NO NO NO
volatile写 NO NO

从上表我们可以看出:

  • 当第二个操作是volatile写时,不管第一个操作是什么都不能重排序。这个规则确保volatile写之前的操作不会被编译器重排序到volatile写之后。
  • 当第一个操作是volatile读时,不管第二个操作是什么都不能重排序。这个规则确保volatile读之后的操作不会被编译器重排序到volatile读之前。
  • 当第一个操作是volatile写,第二个操作是volatile读时不能重排序。

是基于保守策略的JMM内存屏障插入策略:

  • 在每个volatile写操作的前面插入一个StoreStore屏障。
  • 在每个volatile写操作的后面插入一个StoreLoad屏障。
  • 在每个volatile读操作的后面插入一个LoadLoad屏障。
  • 在每个volatile读操作的后面插入一个LoadStore屏障。


锁释放和锁获取的内存语义做个总结

  • 线程A释放一个锁,实质上是线程A向接下来将要获取这个锁的某个线程发出了(线程A对共享变量所做修改的)消息。
  • 线程B获取一个锁,实质上是线程B接收了之前某个线程发出的(在释放这个锁之前对共享变量所做修改的)消息。
  • 线程A释放锁随后线程B获取这个锁,这个过程实质上是线程A通过主内存向线程B发送消息。

为了实现锁的内存语义,查看了ReentrantLock的源代码

公平锁(利用volatile内存语义)

公平锁加锁方法lock()的方法调用轨迹如下

  1. ReentrantLock : lock()
  2. FairSync : lock()
  3. AbstractQueuedSynchronizer : acquire(int arg)
  4. ReentrantLock : tryAcquire(int acquires)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //获取锁的开始,首先读volatile变量state
if (c == 0) {
if (isFirst(current) &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

公平锁解锁方法unlock()的方法调用轨迹如下:

  1. ReentrantLock : unlock()
  2. AbstractQueuedSynchronizer : release(int arg)
  3. Sync : tryRelease(int releases)
1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c); //释放锁的最后,写volatile变量state
return free;
}
  • 加锁方法首先读volatile变量state
  • 释放锁的最后写volatile变量state
1
2
3
根据volatile的happens-before规则

释放锁的线程在写volatile变量之前可见的共享变量,在获取锁的线程读取同一个volatile变量后将立即变的对获取锁的线程可见。


非公平锁(利用CAS内存语义)

非公平锁,加锁方法lock()的方法调用轨迹如下:

  1. ReentrantLock : lock()
  2. NonfairSync : lock()
  3. AbstractQueuedSynchronizer : compareAndSetState(int expect, int update)
1
2
3
4
5
6
7
8
9
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}


// sun.misc.Unsafe类的compareAndSwapInt()方法的源代码
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);

native方法c++会根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀。

如果程序是在多处理器上运行就为cmpxchg指令加上lock前缀(lock cmpxchg)

intel的手册对lock前缀的说明如下

  1. 确保对内存的读-改-写操作原子执行。intel在原有总线锁的基础上做了一个很有意义的优化:如果要访问的内存区域(area of memory)在lock前缀指令执行期间已经在处理器内部的缓存中被锁定(即包含该内存区域的缓存行当前处于独占或以修改状态),并且该内存区域被完全包含在单个缓存行中,那么处理器将直接执行该指令。由于在指令执行期间该缓存行会一直被锁定,其它处理器无法读/写该指令要访问的内存区域,因此能保证指令执行的原子性。这个操作过程叫做缓存锁定,缓存锁定将大大降低lock前缀指令的执行开销,但是当多处理器之间的竞争程度很高或者指令访问的内存地址未对齐时仍然会锁住总线。
  2. 禁止该指令与之前和之后的读和写指令重排序。
  3. 把写缓冲区中的所有数据刷新到内存中。

上面的第2点和第3点所具有的内存屏障效果足以同时实现volatile读和volatile写的内存语义。


现在对公平锁和非公平锁的内存语义做个总结:

  • 公平锁和非公平锁释放时,最后都要写一个volatile变量state。
  • 公平锁获取时,首先会去读这个volatile变量。
  • 非公平锁获取时,首先会用CAS更新这个volatile变量,这个操作同时具有volatile读和volatile写的内存语义。

ReentrantLock的分析可以看出,锁释放-获取的内存语义的实现至少有下面两种方式:

  1. 利用volatile变量的写-读所具有的内存语义。
  2. 利用CAS所附带的volatile读和volatile写的内存语义。



final

写final域的重排序规则禁止把final域的写重排序到构造函数之外。

  • JMM禁止编译器把final域的写重排序到构造函数之外。
  • 编译器会在final域的写之后构造函数return之前,插入一个StoreStore屏障。这个屏障禁止处理器把final域的写重排序到构造函数之外。

读final域的重排序规则

  • 在一个线程中初次读对象引用与初次读该对象包含的final域,JMM禁止处理器重排序这两个操作(注意这个规则仅仅针对处理器)。编译器会在读final域操作的前面插入一个LoadLoad屏障。



JUC工具

同步工具类:信号量(Semaphore),栅栏(Barrier)以及闭锁(Latch)

闭锁(CountDownLatch)

闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量
countDown方法递减计数器,而await方法等待计数器达到零,表示所有需要等待的时间都已经发生了
如果计数器的值非零,则awaite会一直阻塞直到计数器为零,或者等待的线程中断,或者等待超时

  • FutureTask:也可以做闭锁,FutureTask表示的计算是通过Callable实现的,相当于一种可生产结果的Runnable,状态处理以下三种:
    • 等待运行(Waiting to run)
    • 正在运行(Running)
    • 运行完成(Completed)
      1
      FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递能实现结果的安全发布

栅栏(CyclicBarrier)

栅栏的应用面和闭锁差不多,就是如果要周期运用的特性就考虑用栅栏而不是闭锁,
而闭锁是一次性对象,一旦进入终止状态,就不能被重置

信号量(Semaphore)

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量

执行操作时首先获得许可,并在使用以后释放许可,如果没有许可,那么acquire将阻塞直到有许可,release方法将释放一个许可给信号量

交换器(Exchanger)

Exchanger用于进行线程间的数据交换。

它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ExchangerTest {

private static final Exchanger<String> exgr = new Exchanger<String>();

private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

public static void main(String[] args) {

threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A";// A录入银行流水数据
exgr.exchange(A);
} catch (InterruptedException e) {
}
}
});

threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水B";// B录入银行流水数据
String A = exgr.exchange("B");
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
+ A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});

threadPool.shutdown();

}
}


//如果两个线程有一个没有到达exchange方法,则会一直等待,如果担心有特殊情况发生
//避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长。

线程局部变量(ThreadLocal)

线程本地变量,也有些地方叫做线程本地存储;
ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}


private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}


protected T initialValue() {
return null;
}

void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}


Thread的成员变量:ThreadLocal.ThreadLocalMap threadLocals = null;






线程池

Executors:指管理一组同构工作线程的资源池,线程池与工作队列(Work Queue)密切相关,其中工作队列保存了所有等待执行的任务,工作线程任务简单,从工作队列获取一个任务,执行任务,然后返回线程池并等待下一个任务
Executor执行的四个生命周期阶段:创建,提交,开始和完成

  • newFixedThreadPool:将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程发生Exception而结束,线程池会补充一个新的线程)
  • newCacheThreadPool:将创建一个缓存线程池,如果当规模超过了处理需求,那么回收空闲线程,如果需求量大,则添加线程,线程池规模不限制
  • newSingleThreadExecutor:单个线程的线程池,如果线程异常退出,会创建另一个线程来替代,队列顺序串行拿任务(FIFO,LIFO,优先级)看队列是什么队列
  • newScheduledThreadPool:固定长度线程,但是执行却以固时或者延时执行任务类似Timer
    • scheduleAtFixedRate:上一个任务开始执行之后延迟几秒之后再执行,是从上一个任务开始时开始计算
    • scheduleWithFixedDelay:上一个任务结束执行之后延迟几秒之后再执行,是从上一个任务结束时开始计算
      1
      2
      3
      4
      5
      6
      ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
      如果线程池中运行的线程 小于corePoolSize ,即使线程池中的线程都处于空闲状态,也要 创建新的线程 来处理被添加的任务。
      如果线程池中运行的线程大于等于corePoolSize,但是缓冲队列 workQueue未满 ,那么任务被放入缓冲队列 。
      如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满(即无法将请求加入队列 ),并且线程池中的数量小于maximumPoolSize,建新的线程 来处理被添加的任务。
      如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize ,那么通过 handler 所指定的策略来处理此任务。
      当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止 。这样,线程池可以动态的调整池中的线程数。

RejectedExecutionHandler的种类

  • ThreadPoolExecutor.AbortPolicy()
    抛出java.util.concurrent.RejectedExecutionException异常,注意,这是线程池的默认策略
  • ThreadPoolExecutor.CallerRunsPolicy()
    重试添加当前的任务,他会自动重复调用execute()方法
  • ThreadPoolExecutor.DiscardOldestPolicy()
    抛弃旧的任务(最先进入队列的任务)
  • ThreadPoolExecutor.DiscardPolicy()
    抛弃当前的任务(即将进入队列的任务)

任务队列

  • 无界队列:newFixedThreadPoolnewSingleThreadExecutor默认情况使用无界队列LinkedBlockingQueue
  • 有界队列:稳妥的资源管理策略是使用有界队列,如ArrayBlockingQueue,有界的LinkedBlockingQueue,PriorityBlockingQueue
  • 同步队列:synchronousQueue其实并不是一个队列,它没有提供缓存队列元素的功能,他只是线程之间的移交机制,newCacheThreadPool工厂方法就用到了synchronousQueue
  • 延迟队列:ScheduledThreadPoolExecutor用到了DelayedWorkQueue






服务停止

线程池关闭

ExecutorService提供了两种关闭方法:shutdown正常关闭,shutdownNow强行关闭。

  • shutdownNow:首先关闭当前正执行的任务,然后返回所有尚未启动的任务清单
  • shutdown:一直等待队列中的所有任务都执行完成后才关闭
    两种关闭方式差别在于各自的安全性和响应性

毒丸对象

毒丸是指一个放在队列上的对象,其含义是:”当得到这个对象时,立即停止”

只有在无界队列才能可靠工作,如果有界队列,队列满了,很多个producer再进行put的时候发生了阻塞…..恰好这个时候发生了中断操作,虽然producer在put的时候能够响应中断操作,进入InterruptedException的catch代码块进行put毒丸,但producer的put毒丸又阻塞了,会让producer停止不了

JVM关闭钩子

关闭钩子是通过Runtime.addShutdownHook注册的线程
钩子可以在一下几种场景中被调用:

  • 程序正常退出
  • 使用System.exit()
  • 终端使用Ctrl+C触发的中断
  • 系统关闭
  • OutOfMemory宕机
  • 使用Kill pid命令干掉进程(注:在使用kill -9 pid时,是不会被调用的)

用途

  • 删除临时文件
  • 清除无法由操作系统自动清除的资源

线程中断

线程状态如下

  • 新建(new):当线程被创建时,它只会短时间处于这种状态。它已经分配了必要的系统资源,完成了初始化。之后线程调度器将把这个线程转变为可运行或者阻塞状态;
  • 就绪(Runnable):在这种状态下,只要调度器分配时间片给线程,线程就可以运行了;
  • 阻塞(Blocked):某条件阻止线程运行,调度器将忽略阻塞状态的线程,不分配时间片,直到线程进入就绪状态
  • 死亡(Dead):处于死亡或者终结状态的线程将不再是可调度的,并且也不会被分配到时间片。任务死亡的方式通常是从run方法返回,或者被中断;

进入阻塞状态的几个原因

  • 调用sleep(mils)
  • wait使挂起,直到得到notify或者notifyAll
  • 等待输入输出
  • 试图调用同步方法,但对象锁不可用

支持中断的阻塞

  • Thread.sleep
  • Object.wait
  • Reentrantlock(lock.lockInterruptibly)
  • BlockingQueue.take / put

    响应中断操作包括:清除中断状态,抛出InterruptedException

    中断操作真正于:并不会真正地去中断一个正在运行的线程,而是发出中断请求,然后由线程在下一个合适时刻中断自己,这些时刻也被称为取消点,例如:wait,sleep,join等

不支持中断的阻塞

  • synchronized对象锁
  • IO阻塞
  • 运行期间

处理不支持中断的阻塞

  • 运行期间
    1
    2
    3
    4
    5
    6
    7
    8
    public void run() {
    try {
    while (!Thread.interrupted()) {
    .......
    }catch (InterruptedException e) {
    System.out.println("interrupted from blocked...");
    }
    }

通过interrupted方法可以读取到线程该标志位,判断线程是否被中断进而执行退出任务的策略判断

  • IO阻塞

    • Socket阻塞,通过关闭Socket就会抛出SocketException异常来终结
    • 同步IO,当中断InterruptibleChannel上等待的线程,将抛出CloseByInterruptException并关闭链路,将导致所有在链路上的阻塞线程都抛出AsynchronousCloseException
    • Selector的异步IO,如果线程调用Select.select方法阻塞,可以调用close或者wakeup方法抛出ClosedSelectorException并提前返回
  • synchronized对象锁
    替换成可中断的锁






Callable与Runable

Callable与Runnable的功能大致相似,Callable中有一个call()函数,但是call()函数有返回值,而Runnable的run()函数不能将结果返回

Future

Future对于具体的Runnable或者Callable任务的执行结果进行如下操作

  • 取消
  • 查询是否完成
  • 获取结果
  • 设置结果操作

FutureTask(RunnableFuture)

FutureTask实现了RunnableFuture接口
RunnableFuture又继承了Runnbale,Futrue这两个接口
所以总结FutureTask实现了Runnbale,Future接口

Executor的执行流

Executor就是RunnableCallable的调度容器

  1. submit 提交任务后newTaskFor生成RunnableFuture(即FutureTask)
  2. execute 执行如Runnable地执行RunnableFuture(FutureTask),FutureTask内部封装了Future的操控






非阻塞锁(CAS)

悲观锁(独占锁)

其他线程不会造成干扰(获取正确的锁)的情况下才能执行
设计加锁模型的都是悲观锁

乐观锁

借助”冲突检查机制”判断更新过程中是否存在来自其他线程的干扰,如果存在则操作失败,并且可以重试(也可以不重试)

多处理器操作而设计的一些特殊指令

  1. Test-and-Set(支持原子的测试并设置)
  2. Fetch-and-Increment(获取并递增)
  3. Swap(交换)
  4. Compare-and-Swap(CAS比较并交换)
  5. Load-Linked(关联加载)
  6. Store-Conditional(条件存储)

几乎所有处理器都包含原子的读-改-写指令,例如CAS,LL,SC

Compare-and-Swap(CAS比较并交换)

CAS包含3个操作数

  • 需要读写内存位置V
  • 进行比较的值A和拟写入的新值B
  • 当且仅当V的值等于A时,CAS才会通过原子方式用新值B更新

在支持CAS指令平台上运行时把他们编译成相应的机器指令

ABA问题

“如果V的值首先从A变B,再由B变成A”
多加版本号:即A变成B,然后又变成A,版本号也将是不同的
各种乐观锁的实现中通常都会用版本version来对记录或对象标记,避免并发操作带来的ABA问题
类可以用AtomicStampedReference






自定义同步

类库没有你需要的同步机制,那可以利用语言和类库提供的底层机制来构造自己的同步机制

  • 内置的条件队列
  • 显式的Condition对象
  • AbstractQueuedSynchronized框架

条件队列/谓语

条件队列:一组线程(等待线程集合)能够通过某种方法来等待特定的条件变为真
条件谓语:是使某个操作成为状态依赖操作的前提条件。

每个Java对象都可以作为一个锁,每个对象统一可以作为条件队列
并且Object中的wait,notify,notifyAll等方法构成了内部条件队列的API




Condition对象

Lock是广义内置锁,Condition是广义内置条件队列
普通内置队列存在缺陷,每个内置锁都只能有一个相关联的内置条件队列
因而多线程在多个条件谓语在同一个条件队列不合适,这些因素导致notifyAll时所有等待的线程并非是同一类型的条件谓语等待线程

想要突破上面这样的效果,可以用Condition对象和Lock对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 条件谓语: notFull (count < items.length)
private final Condition notFull = lock.newCondition();
// 条件谓语: notEmpty (count > 0)
private final Condition notEmpty = lock.newCondition();


// 阻塞并直到: notFull
public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[tail] = x;
if (++tail == items.length)
tail = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

// 阻塞并直到: notEmpty
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
T x = items[head];
items[head] = null;
if (++head == items.length)
head = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}




AbstractQueuedSynchronizer(AQS)

抽象的队列式的同步器
很多同步类实现都与它相关如:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch、SynchronousQueue、FutureTask

内部维护有state,(FIFO线程等待队列)

AQS定义两种资源共享方式

  • Exclusive(独占,只有一个线程能执行,如ReentrantLock)
    • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
    • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)
    • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。当然也有都支持独占和共享的ReentrantReadWriteLock
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。
      此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。
      当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。
      但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

      CountDownLatch为例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。
      这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。
      等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

      Semaphore为例,初始化给予N个permit许可,每次acquire,state减1,直到state=0,则队列的任务就阻塞
      直到有release出现去递增state的数值

获取一个排他锁。

1
2
3
4
5
6
7
8
9
10
11
12
while(获取锁) {
if (获取锁成功) {
退出while循环
} else {
if(当前线程没有入队列) {
那么入队列
}
阻塞当前线程
}
}

获取锁成功->tryAcquire

释放一个排他锁。

1
2
3
4
5
6
if (释放锁成功) {
删除头结点
激活原头结点的后继节点
}

释放锁成功->tryRelease




实现流程-独占模式-ReentrantLock

1
AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现
1
2
3
4
5
6
7
sequenceDiagram
ReentrantLock->>自定义同步器FairSync: lock
自定义同步器FairSync->>AbstractQueuedSynchronizer:acquire(1)准备拿锁
AbstractQueuedSynchronizer->>自定义同步器FairSync:tryAcquire/尝试获取如重入,或者state=0
AbstractQueuedSynchronizer->>AbstractQueuedSynchronizer:addWaiter/添加Node节点进CLH队列
AbstractQueuedSynchronizer->>AbstractQueuedSynchronizer:acquireQueued/具体步骤
AbstractQueuedSynchronizer->>AbstractQueuedSynchronizer: selfInterrupt/自检中断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标记是否成功拿到资源
try {
boolean interrupted = false;//标记等待过程中是否被中断过

//又是一个“自旋”!
for (;;) {
final Node p = node.predecessor();//拿到前驱
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
failed = false;
return interrupted;//返回等待过程中是否被中断过
}

//如果自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
if (failed)
cancelAcquire(node);
}
}


实现流程-共享模式-CountDownLatch

1
2
3
4
5
sequenceDiagram
CountDownLatch->>CountDownLatch:await/自我等待
CountDownLatch->>Sync:tryAcquireSharedNanos/等待获取许可
CountDownLatch->>CountDownLatch:countDown/递减state数量
CountDownLatch->>Sync:releaseShared(1)/递减state数量1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}




实现流程-共享模式-Semaphore

NonfairSyncFairSync之分

1
2
3
4
5
6
7
8
9
10
11
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

独占模式ReentrantLock:state==0&&当前重入才可以获取锁,呈递增
共享模式Semaphore:state>0就可以获取锁,呈递减

当然你想自定义扩展自己的同步方式,可以直接编写tryAcquire、tryRelease、tryAcquireShared、tryReleaseShare

总结

以下几种都是内部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
acquireQueued
doAcquireInterruptibly
doAcquireNanos
doAcquireShared
doAcquireSharedInterruptibly
doAcquireSharedNanos


try catch包裹,catch主要做cancelAquire
cancelAquire主要用来出现异常或获得执行权限的情况下取消掉这个节点

try内部有for循环包裹
那获取当前node的前驱节点,如果前驱节点是head,尝试去获取一下锁,tryAquire
获取成功则设置node为头节点,头的next=null,falid=false,再返回

如果tryAquire失败后,则shouldParkAfterFailedAcquire并parkAndCheckInterrupt
shouldParkAfterFailedAcquire主要做的逻辑是:
判断node前驱节点的waitStatus,一般有-1==SINGAL,0==初始化状态,1==CANCEL
如果初始化状态则设置-1,再内部循环一次,判断到-1==SINGAL,返回true,进行parkAndCheckInterrupt
如果判断出1则从链表消除这个前驱结点,然后再判断新的前驱结点状态

parkAndCheckInterrupt内部主要做LockSupport.park(this);阻塞当前线程,等待unpark解除阻塞
然后在返回线程是否中断Thread.interrupted();
如果中断则抛中断异常

doReleaseShared用来unparkSuccessor前面的节点解除阻塞状态

以下几种都是外部扩展

1
2
3
4
5
6
7
8
tryAcquire
tryRelease

tryAciquireShared
tryReleaseShared

公平与未公平主要在tryAcquire体现,如果用cas抢占获得执行权限则不公平
否则会判断当前未阻塞的执行线程节点是否为头节点,不是则不允许进行cas抢占

主要执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
RenntranLock->lock->aquire

先tryAquire不行再进入doAcquireInterruptibly等
doAcquireInterruptibly再tryAquire,不行再进入shouldParkAfterFailedAquire && parkAndCheckInterrupt....如果有interrupt则抛异常

有异常或者获得了执行权限去cancelAquire取消节点
unlock->tryRelease如果==0不允许重复释放,否则可以则unpark相关的前驱节点


CountDownLatch->await-->acquireSharedInterruptibly
如果tryAquireShared<0则进行doAquireShardInterruptibly与aquireQueue逻辑一致
countdown-->releaseShared主要目的是递减state,如果==0则不允许递减,成功递减unpark前驱节点线程


Semaphore->aquire->acquireSharedInterruptibly
如果tryAquireShared<0则进行doAquireShardInterruptibly与aquireQueue逻辑一致
release->releaseShared:主要目的是递增state


Semaphore的tryAquireShared主要是递减,tryReleaseShared主要是递增
CountDownLatch的tryAquireShared主要不做只判断state==0,tryReleaseShared主要做递减
ReentrantLock的tryAquire主要做递增(重入锁),tryRelease主要做递减(重入锁)

Syhchronized原理

线程的生命周期存在5个状态,start、running、waiting、blocking和dead

对于一个synchronized修饰的方法(代码块)来说:

  • 当多个线程同时访问该方法,那么这些线程会先被放进_EntryList队列,此时线程处于blocking状态

  • 当一个线程获取到了实例对象的监视器(monitor)锁,那么就可以进入running状态,执行方法,此时,ObjectMonitor对象的_owner指向当前线程,_count加1表示当前对象锁被一个线程获取

  • 当running状态的线程调用wait()方法,那么当前线程释放monitor对象,进入waiting状态,ObjectMonitor对象的_owner变为null,_count减1,同时线程进入_WaitSet队列,直到有线程调用notify()方法唤醒该线程,则该线程重新获取monitor对象进入_Owner区

  • 如果当前线程执行完毕,那么也释放monitor对象,进入waiting状态,ObjectMonitor对象的_owner变为null,_count减1

锁的优化

锁升级

锁的4中状态:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态(级别从低到高)

  • 1:偏向锁

为什么要引入偏向锁?

因为经过HotSpot的作者大量的研究发现,大多数时候是不存在锁竞争的,常常是一个线程多次获得同一个锁,因此如果每次都要竞争锁会增大很多没有必要付出的代价,为了降低获取锁的代价,才引入的偏向锁。

偏向锁的升级?

当线程1访问代码块并获取锁对象时,会在java对象头和栈帧中记录偏向的锁的threadID,因为偏向锁不会主动释放锁,因此以后线程1再次获取锁的时候,需要比较当前线程的threadID和Java对象头中的threadID是否一致,如果一致(还是线程1获取锁对象),则无需使用CAS来加锁、解锁;如果不一致(其他线程,如线程2要竞争锁对象,而偏向锁不会主动释放因此还是存储的线程1的threadID),那么需要查看Java对象头中记录的线程1是否存活,如果没有存活,那么锁对象被重置为无锁状态,其它线程(线程2)可以竞争将其设置为偏向锁;如果存活,那么立刻查找该线程(线程1)的栈帧信息,如果还是需要继续持有这个锁对象,那么暂停当前线程1,撤销偏向锁,升级为轻量级锁,如果线程1 不再使用该锁对象,那么将锁对象状态设为无锁状态,重新偏向新的线程。

偏向锁的取消?

偏向锁是默认开启的,而且开始时间一般是比应用程序启动慢几秒,如果不想有这个延迟,那么可以使用-XX:BiasedLockingStartUpDelay=0;

如果不想要偏向锁,那么可以通过-XX:-UseBiasedLocking = false来设置;

  • 2:轻量级锁

为什么要引入轻量级锁?

轻量级锁考虑的是竞争锁对象的线程不多,而且线程持有锁的时间也不长的情景。因为阻塞线程需要CPU从用户态转到内核态,代价较大,如果刚刚阻塞不久这个锁就被释放了,那这个代价就有点得不偿失了,因此这个时候就干脆不阻塞这个线程,让它自旋这等待锁释放。

轻量级锁什么时候升级为重量级锁?

线程1获取轻量级锁时会先把锁对象的对象头MarkWord复制一份到线程1的栈帧中创建的用于存储锁记录的空间(称为DisplacedMarkWord),然后使用CAS把对象头中的内容替换为线程1存储的锁记录(DisplacedMarkWord)的地址;

如果在线程1复制对象头的同时(在线程1CAS之前),线程2也准备获取锁,复制了对象头到线程2的锁记录空间中,但是在线程2CAS的时候,发现线程1已经把对象头换了,线程2的CAS失败,那么线程2就尝试使用自旋锁来等待线程1释放锁。

但是如果自旋的时间太长也不行,因为自旋是要消耗CPU的,因此自旋的次数是有限制的,比如10次或者100次,如果自旋次数到了线程1还没有释放锁,或者线程1还在执行,线程2还在自旋等待,这时又有一个线程3过来竞争这个锁对象,那么这个时候轻量级锁就会膨胀为重量级锁。重量级锁把除了拥有锁的线程都阻塞,防止CPU空转。

1
2
3
4
5
6
7
8
9
10
自适应自旋锁

自适应意味着自旋的时间不再固定了,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定

如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中
那么虚拟机就会认为这次自旋也很有可能再次成功,进而它将允许自旋等待持续相对更长的时间,比如100个循环。
相反的,如果对于某个锁,自旋很少成功获得过,那在以后要获取这个锁时将可能减少自旋时间甚至省略自旋过程,以避免浪费处理器资源。

自适应自旋解决的是“锁竞争时间不确定”的问题。JVM很难感知到确切的锁竞争时间,而交给用户分析就违反了JVM的设计初衷。
自适应自旋假定不同线程持有同一个锁对象的时间基本相当,竞争程度趋于稳定,因此,可以根据上一次自旋的时间与结果调整下一次自旋的时间。

锁消除

Java虚拟机在JIT编译时(可以简单理解为当某段代码即将第一次被执行时进行编译,又称即时编译)

通过对运行上下文的扫描,经过逃逸分析,去除不可能存在共享资源竞争的锁,通过这种方式消除没有必要的锁,可以节省毫无意义的请求锁时间

锁粗化

同步块的作用范围应该尽可能小,仅在共享数据的实际作用域中才进行同步,这样做的目的是为了使需要同步的操作数量尽可能缩小,缩短阻塞时间,如果存在锁竞争,那么等待锁的线程也能尽快拿到锁。

但是加锁解锁也需要消耗资源,如果存在一系列的连续加锁解锁操作,可能会导致不必要的性能损耗。

锁粗化就是将多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁,避免频繁的加锁解锁操作。