1.1.进程与线程

1 进程与线程

进程

  • 程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在 指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的 。
  • 当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
  • 进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器 等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)
  • 所赐进程是程序的⼀次执⾏过程,也可以说,进程是程序运行的一次实例。是系统运⾏程序的基本单位。在 Java 中,当我们启动 main 函数时其实就是启动了⼀个 JVM 的进程,⽽ main 函数所在的线 程就是这个进程中的⼀个线程,也称主线程

线程

  • 一个进程之内可以分为一到多个线程。

  • 一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行

  • Java 中,线程作为最小调度单位,进程作为资源分配的最小单位。 在 windows 中进程是不活动的,只是作 为线程的容器

二者对比

  • 进程基本上相互独立的,而线程存在于进程内,是进程的一个子集

  • 进程拥有共享的资源,如内存空间等,供其内部的线程共享

  • 进程间通信较为复杂

    • 同一台计算机的进程通信称为 IPC(Inter-process communication)
    • 不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP
  • 线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量

  • 线程更轻量,线程上下文切换成本一般上要比进程上下文切换低

说说线程的生命周期和状态?

Java 线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态:

  • NEW: 初始状态,线程被创建出来但没有被调用 start() 。
  • RUNNABLE: 运行状态,线程被调用了 start()等待运行的状态。
  • BLOCKED:阻塞状态,需要等待锁释放。
  • WAITING:等待状态,表示该线程需要等待其他线程做出一些特定动作(通知或中断)。
  • TIME_WAITING:超时等待状态,可以在指定的时间后自行返回而不是像 WAITING 那样一直等待。
  • TERMINATED:终止状态,表示该线程已经运行完毕。

img

2.3.2.2. 程序计数器为什么是私有的?

程序计数器主要有下⾯两个作⽤:

  1. 字节码解释器通过改变程序计数器来依次读取指令,从⽽实现代码的流程控制,如:顺序执 ⾏、选择、循环、异常处理。
  2. 在多线程的情况下,程序计数器⽤于记录当前线程执⾏的位置,从⽽当线程被切换回来的时 候能够知道该线程上次运⾏到哪⼉了。 需要注意的是,如果执⾏的是 native ⽅法,那么程序计数器记录的是 undefined 地址,只有执⾏ 的是 Java 代码时程序计数器记录的才是下⼀条指令的地址。

所以,程序计数器私有主要是为了线程切换后能恢复到正确的执⾏位置

2.3.2.3. 虚拟机栈和本地⽅法栈为什么是私有的

  • 虚拟机栈: 每个 Java ⽅法在执⾏的同时会创建⼀个栈帧⽤于存储局部变量表、操作数栈、 常量池引⽤等信息。从⽅法调⽤直⾄执⾏完成的过程,就对应着⼀个栈帧在 Java 虚拟机栈 中⼊栈和出栈的过程。
  • 本地⽅法栈: 和虚拟机栈所发挥的作⽤⾮常相似,区别是: 虚拟机栈为虚拟机执⾏ Java ⽅法 (也就是字节码)服务,⽽本地⽅法栈则为虚拟机使⽤到的**Native **⽅法服务。 在 HotSpot 虚拟机中和 Java 虚拟机栈合⼆为⼀。

所以,为了保证线程中的局部变量不被别的线程访问到,虚拟机栈和本地⽅法栈是线程私有的

2.2 并行与并发

并发: 同⼀时间段(重读),多个任务都在执⾏ (但是单位时间内不⼀定同时执⾏);

并⾏单位时间内,多个任务同时执⾏。

引用 Rob Pike 的一段描述:

​ 并发(concurrent)是同一时间应对(dealing with)多件事情的能力 。

​ 并行(parallel)是同一时间动手做(doing)多件事情的能力。

常用方法

方法 功能 说明
public void start() 启动一个新线程;Java虚拟机调用此线程的run方法 start 方法只是让线程进入就绪,里面代码不一定立刻 运行(CPU 的时间片还没分给它)。每个线程对象的 start方法只能调用一次,如果调用了多次会出现 IllegalThreadStateException
public void run() 线程启动后调用该方法 如果在构造 Thread 对象时传递了 Runnable 参数,则 线程启动后会调用 Runnable 中的 run 方法,否则默 认不执行任何操作。但可以创建 Thread 的子类对象, 来覆盖默认行为
public void setName(String name) 给当前线程取名字
public void getName() 获取当前线程的名字。线程存在默认名称:子线程是Thread-索引,主线程是main
public static Thread currentThread() 获取当前线程对象,代码在哪个线程中执行
public static void sleep(long time) 让当前线程休眠多少毫秒再继续执行。Thread.sleep(0) : 让操作系统立刻重新进行一次cpu竞争
public static native void yield() 提示线程调度器让出当前线程对CPU的使用 主要是为了测试和调试
public final int getPriority() 返回此线程的优先级
public final void setPriority(int priority) 更改此线程的优先级,常用1 5 10 java中规定线程优先级是1~10 的整数,较大的优先级 能提高该线程被 CPU 调度的机率
public void interrupt() 中断这个线程,异常处理机制
public static boolean interrupted() 判断当前线程是否被打断,清除打断标记
public boolean isInterrupted() 判断当前线程是否被打断,不清除打断标记
public final void join() 等待这个线程结束
public final void join(long millis) 等待这个线程死亡millis毫秒,0意味着永远等待
public final native boolean isAlive() 线程是否存活(还没有运行完毕)
public final void setDaemon(boolean on) 将此线程标记为守护线程或用户线程
public long getId() 获取线程长整型 的 id id 唯一
public state getState() 获取线程状态 Java 中线程状态是用 6 个 enum 表示,分别为: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED
public boolean isInterrupted() 判断是否被打 断 不会清除 打断标记

3.9 interrupt方法详解

Interrupt说明

interrupt的本质是将线程的打断标记设为true, t1.isInterrupted()(获取打断标记).并调用线程的三个parker对象(C++实现级别)unpark该线程。

基于以上本质,有如下说明:

  • 打断线程不等于中断线程,有以下两种情况:
    • 打断正在运行中的线程并不会影响线程的运行,但如果线程监测到了打断标记为true,可以自行决定后续处理。
    • 打断阻塞中的线程会让此线程产生一个InterruptedException异常,结束先程的运行。但如果该异常被线程捕获住,该线程依然可以自行决定后续处理(终止运行,继续运行,做一些善后工作等等)
方法 功能
public static boolean interrupted() 判断当前线程是否被打断,清除打断标记
public boolean isInterrupted() 判断当前线程是否被打断,不清除打断标记

打断 sleep,wait,join 的线程

这几个方法都会让线程进入阻塞状态 ,

打断 sleep ,wait,join 的线程, 会清空打断状态,让打断状态为false,以 sleep 为例,**一般用异常的方法控制线程**

* 模式之两阶段终止

Two Phase Termination 在一个线程 T1 中如何“优雅”终止线程 T2?这里的【优雅】指的是给 T2 一个料理后事的机会。

错误思路

  • 使用线程对象的 stop() 方法停止线程
    • stop 方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁, 其它线程将永远无法获取锁 出现死锁
  • 使用 System.exit(int) 方法停止线程
    • 目的仅是停止一个线程,但这种做法会让整个程序都停止

两阶段终止模式

利用 isInterrupted

interrupt 可以打断正在执行(不是运行)的线程,无论这个线程是在 sleep,wait,还是正常运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class TPTInterrupt {
private Thread thread;

//启动监控线程
public void start(){
thread = new Thread(() -> {
while(true) {
Thread current = Thread.currentThread();
if(current.isInterrupted()) {
log.debug("料理后事");
break;
}
try {
//为了不反复进入while循环
Thread.sleep(1000);
log.debug("将结果保存");
} catch (InterruptedException e) {
current.interrupt();
}
// 执行监控操作
}
},"监控线程");
thread.start();
}

//停止监控线程
public void stop() {
thread.interrupt();
}
}
利用停止标记
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 停止标记用 volatile 是为了保证该变量在多个线程之间的可见性
// 我们的例子中,即主线程把它修改为 true 对 t1 线程可见
class TPTVolatile {
private Thread thread;
private volatile boolean stop = false;
public void start(){
thread = new Thread(() -> {
while(true) {
Thread current = Thread.currentThread();
if(stop) {
log.debug("料理后事");
break;
}
try {
Thread.sleep(1000);
log.debug("将结果保存");
} catch (InterruptedException e) {
}
// 执行监控操作
}
},"监控线程");
thread.start();
}
public void stop() {
stop = true;
thread.interrupt();
}
}

打断 park 线程

打断 park 线程, 不会清空打断状态。也就是你打断了之后,状态还是true

但是如果你是wait,sleep,join那些,打断之后,其实应该是true,但是因为这些会清空打断状态,就会变成false

1
2
3
4
5
6
7
8
9
10
11
private static void test3() throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("park...");
LockSupport.park();
log.debug("unpark...");
log.debug("打断状态:{}", Thread.currentThread().isInterrupted());
}, "t1");
t1.start();
sleep(0.5);
t1.interrupt();
}

输出

1
2
3
21:11:52.795 [t1] c.TestInterrupt - park... 
21:11:53.295 [t1] c.TestInterrupt - unpark...
21:11:53.295 [t1] c.TestInterrupt - 打断状态:true

如果打断标记已经是 true, 则 park 会失效

1
2
3
4
5
6
7
8
9
10
11
12
private static void test4() {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
log.debug("park...");
LockSupport.park();
log.debug("打断状态:{}", Thread.currentThread().isInterrupted());
}
});
t1.start();
sleep(1);
t1.interrupt();
}

输出

1
2
3
4
5
6
7
8
9
10
21:13:48.783 [Thread-0] c.TestInterrupt - park... 
21:13:49.809 [Thread-0] c.TestInterrupt - 打断状态:true
21:13:49.812 [Thread-0] c.TestInterrupt - park...
21:13:49.813 [Thread-0] c.TestInterrupt - 打断状态:true
21:13:49.813 [Thread-0] c.TestInterrupt - park...
21:13:49.813 [Thread-0] c.TestInterrupt - 打断状态:true
21:13:49.813 [Thread-0] c.TestInterrupt - park...
21:13:49.813 [Thread-0] c.TestInterrupt - 打断状态:true
21:13:49.813 [Thread-0] c.TestInterrupt - park...
21:13:49.813 [Thread-0] c.TestInterrupt - 打断状态:true

提示

可以使用 Thread.interrupted() 返回状态后,清除打断状态

$\textcolor{Blue}{3.4 * 原理之线程运行} $

栈与栈帧

Java Virtual Machine Stacks (Java 虚拟机栈)

我们都知道 JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?其实就是线程,每个线程启动后,虚拟 机就会为其分配一块栈内存。

  • 每个栈由多个栈帧(Frame)组成,对应着每次方法调用时所占用的内存
  • 每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法

刚开始,加载方法

image-20240321232137004

执行method方法的准备工作

image-20240321232809506

image-20240321233016381

image-20240321233129935

线程上下文切换(Thread Context Switch)

为一些原因导致 cpu 不再执行当前的线程,转而执行另一个线程的代码 .需要由操作系统保存当前线程的状态,以 便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是⼀次 上下⽂切换

这里的原因是

  • 线程的 cpu 时间片用完
  • **垃圾回收 **
  • 有更高优先级的线程需要运行
  • 线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法

Java 中对应的概念 就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的

  • 状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址
  • Context Switch 频繁发生会影响性能

image-20240321233833682

死锁

概念:当一组线程发生死锁的情况下,该组死锁线程中的每一个线程都在等待该组中其它死锁线程的资源的释放

死锁产生的原因:

学过操作系统的朋友都知道产⽣死锁必须具备以下四个条件:

  1. 互斥条件:该资源任意⼀个时刻只由⼀个线程占⽤。

  2. 请求与保持条件:⼀个进程因请求资源⽽阻塞时,对已获得的资源保持不放。

  3. 不剥夺条件: 线程已获得的资源在末使⽤完之前不能被其他线程强⾏剥夺,只有⾃⼰使⽤完毕 后才释放资源。

  4. 循环等待条件:若⼲进程之间形成⼀种头尾相接的循环等待资源关系

如何避免线程死锁?

我上⾯说了产⽣死锁的四个必要条件,为了避免死锁,我们只要破坏产⽣死锁的四个条件中的其 中⼀个就可以了

  1. **破坏互斥条件 **:这个条件我们没有办法破坏,因为我们⽤锁本来就是想让他们互斥的(临界 资源需要互斥访问)。

  2. 破坏请求与保持条件 :⼀次性申请所有的资源。

  3. 破坏不剥夺条件 :已获得某部分资源的线程进⼀步申请其他资源时,如果申请不到,可以主动释 放它已获得的资源。

  4. 破坏循环等待条件 :靠按序申请资源来预防。按某⼀顺序申请资源,释放资源则反序释放。 破坏循环等待条件

image-20240409210548101

谈一谈synchronized

synchronized 关键字解决的是多个线程之间访问资源的同步性, 被它修饰的方法或者代码块在任意时刻只能有⼀个线程执行。

Java早期版本中,synchronized其实采用的是一个重量级锁的概念,但是在Java后期,我们对synchronized加入了大量的优化,如⾃旋锁、适 应性⾃旋锁、锁消除、锁粗化、偏向锁、轻量级锁等技术来减少锁操作的开销。

接下来我将着重介绍这些锁的基本原理。

首先我们需要明白Java的对象头,对于一个普通Java对象的对象头,他是由两部分组成的。分别是Mark Wrod 以及Klass word,其中Mark Word主要用于存储对象自身的运行时数据,如哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID、偏向时间戳等。而Klass word或者说类型指针,是对象指向其类元数据的指针,实际上是一个直接引用,(这里拓展一个知识点,关于类加载的机制,类加载机制的解析,就是将符号引用转化为直接引用,符号引用指向的是类的全限定符,也就是类的名字)

那么Java的对象头其实就可以表示当前加的锁是轻量级锁,还是偏向锁或者是重量锁。

1
2
3
4
5
|--------------------------------------------------------------|
| Object Header (64 bits) |
|------------------------------------|-------------------------|
| Mark Word (32 bits) | Klass Word (32 bits) |
|------------------------------------|-------------------------|

一般来说是这样子的,用最后三位来表示。000代表室轻量锁,010代表是重量锁,101则是偏向锁

1
2
3
4
5
6
7
8
9
10
11
12
13
|-------------------------------------------------------|--------------------|
| Mark Word (32 bits) | State |
|-------------------------------------------------------|--------------------|
| hashcode:25 | age:4 | biased_lock:0 | 01 | Normal |
|-------------------------------------------------------|--------------------|
|thread:23|epoch:2| age:4 | biased_lock:1 | 01 | Biased |
|-------------------------------------------------------|--------------------|
| ptr_to_lock_record:30 | 00 | Lightweight Locked |
|-------------------------------------------------------|--------------------|
| ptr_to_heavyweight_monitor:30 | 10 | Heavyweight Locked |
|-------------------------------------------------------|--------------------|
| | 11 | Marked for GC |
|-------------------------------------------------------|--------------------|

轻量锁

  • 创建锁记录(Lock Record)对象,每个线程都的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的 Mark Word

    image-20240327235700464

  • 让锁记录中 Object reference 指向锁对象,并尝试用 cas 替换 Object 的 Mark Word,将 Mark Word 的值存 入锁记录

    image-20240327235802620

  • 如果 cas 替换成功,对象头中存储了 锁记录地址和状态 00 ,表示由该线程给对象加锁,这时图示如下

    image-20240327235828136

  • 如果 cas 失败,有两种情况

    • 如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程
    • 如果是自己执行了 synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数

    image-20240328000531096

  • 当退出 synchronized 代码块(解锁时)如果有取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重 入计数减一

    image-20240328000542680

  • 当退出 synchronized 代码块(解锁时)锁记录的值不为 null,这时使用cas将Mark Word的值恢复给对象头

    • 成功,则解锁成功
    • 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

总结,每个线程里面都有一个Lock Record,锁记录。如果要申请这个某个对象的锁,那么让锁记录中 Object reference 指向锁对象,并尝试用 cas 替换 Object 的 Mark Word,将 Mark Word 的值存 入锁记录,将锁地址存入对象的MarkWord。cas成功就是代表获取锁成功了。但是如果是cas失败的话,一般来说就是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程。 或者如果是自己执行了 synchronized 锁重入,那么再添加一条Lock Record作为重入的计数

锁膨胀

如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有 竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。

1
2
3
4
5
6
static Object obj = new Object();
public static void method1() {
synchronized( obj ) {
// 同步块
}
}
  • 当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁

image-20240328000730832

  • 这时 Thread-1 加轻量级锁失败,进入锁膨胀流程
    • 即为 Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址
    • 然后自己进入 Monitor 的 EntryList BLOCKED

image-20240328001101686

  • 当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,失败。这时会进入重量级解锁 流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程

锁膨胀就是将轻量级锁升级为重量级锁,那么对于重量级锁来说,对象头要申请一个Monitor对象,翻译过来叫做监视器。这个监视器对象有WaitSet用来防止不满足条件的线程(后面会讲到),还有EntryList,这是一个等待队列,也就是阻塞队列,还有一个Owner,用来表示当前使用的线程。接上述讲的,如果因为竞争导致cas失败,那么他就会将已获得锁的线程指向Owner,然后将要竞争的锁放到EntryList,等待Owner线程释放锁。竞争失败的锁,这个时候新版JDK还加入了一个自旋锁,可以不断尝试获得锁

image-20240409224419554

偏向锁

可以看一下原文档。。。。

已经被废弃了目前

但是轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行 CAS 操作

所以我们中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现 这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有,这也就是偏向锁。在偏向锁没有被废除的时候,一般来说,我们获取锁获取的都是偏向锁。(不过刚开始偏向锁默认是延迟的,如果想要让他一开始就是偏向锁的,就要修改VM参数,当然了也可以禁用偏向锁。这边需要注意的是,偏向锁如果调用了HashCode的话,那么就不是偏向锁状态了。因为偏向锁的话,对象头前面需要一些位置来存线程的id,HashCode就没地方存了。)关于偏向锁,还有一个批量重偏向的优化。在介绍这个之前,首先,我们必须明确一点,在线程1有偏向锁的情况下,如果该对象锁再次被线程1获取了,此时应该是偏向锁,如果该锁释放了。这个过程叫做撤销偏向锁。然后如果此时线程2在去获取的话,因为有其它线程,锁应该升级成轻量级锁。但是如果线程1撤销偏向锁的次数超过一定阈值(20次),那么线程2取获取的时候,jvm会认为看来这个锁的主人不是线程1,所以这个时候线程2是可以获取偏向锁的。这个过程叫做批量重偏向。但是如果此时,线程2也撤销偏向锁超过了40次的阈值,那么此时JVM认为确实不应该偏向,此时线程3获取锁的时候,又只能是加上轻量级锁了。

最后还提供了一个锁消除的优化,

锁消除

可以把没有啥用的锁给优化掉,提高效率

ReentrantLock

ReentrantLock 是什么?

ReentrantLock 实现了 Lock 接口,是一个可重入且独占式的锁,synchronized 关键字类似。不过,ReentrantLock 更灵活、更强大,增加了轮询、超时、中断、公平锁和非公平锁等高级功能。

1
public class ReentrantLock implements Lock, java.io.Serializable {}

ReentrantLock 里面有一个内部类 SyncSync 继承 AQS(AbstractQueuedSynchronizer),添加锁和释放锁的大部分操作实际上都是在 Sync 中实现的。Sync 有公平锁 FairSync 和非公平锁 NonfairSync 两个子类

公平锁和非公平锁有什么区别?

  • 公平锁 : 锁被释放之后,先申请的线程先得到锁。性能较差一些,因为公平锁为了保证时间上的绝对顺序,上下文切换更频繁。
  • 非公平锁:锁被释放之后,后申请的线程可能会先获取到锁,是随机或者按照其他优先级排序的。性能更好,但可能会导致某些线程永远无法获取到锁(饥饿)

synchronized 和 ReentrantLock 有什么区别?

两者都是可重入锁

可重入锁 也叫递归锁,指的是线程可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,此时这个对象锁还没有释放,当其再次想要获取这个对象的锁的时候还是可以获取的,如果是不可重入锁的话,可能会造成死锁

synchronized 依赖于 JVM 而 ReentrantLock 依赖于 AP

synchronized 是依赖于 JVM 实现的,前面我们也讲到了 虚拟机团队在 JDK1.6 为 synchronized 关键字进行了很多优化,但是这些优化都是在虚拟机层面实现的,并没有直接暴露给我们。

ReentrantLock 是 JDK 层面实现的(也就是 API 层面,需要 lock() 和 unlock() 方法配合 try/finally 语句块来完成),所以我们可以通过查看它的源代码,来看它是如何实现的。

ReentrantLock 比 synchronized 增加了一些高级功能

相比synchronizedReentrantLock增加了一些高级功能。主要来说主要有三点:

  • 等待可中断 : ReentrantLock提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly() 来实现这个机制。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。
  • 可实现公平锁 : ReentrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。ReentrantLock默认情况是非公平的,可以通过 ReentrantLock类的ReentrantLock(boolean fair)构造方法来指定是否是公平的。
  • 可实现选择性通知(锁可以绑定多个条件): synchronized关键字与wait()notify()/notifyAll()方法相结合可以实现等待/通知机制。ReentrantLock类当然也可以实现,但是需要借助于Condition接口与newCondition()方法。(条件变量)
1
2
3
4
private final Condition condition = lock.newCondition();  
private final Condition condition1 = lock.newCondition();
condition.await();
condition.sigal();

可中断锁和不可中断锁有什么区别?

  • 可中断锁获取锁的过程中可以被中断,不需要一直等到获取锁之后 才能进行其他逻辑处理。ReentrantLock 就属于是可中断锁。

  • 不可中断锁一旦线程申请了锁,就只能等到拿到锁以后才能进行其他的逻辑处理。 synchronized 就属于是不

1
2
3
4
创建一个ReentrantLock对象。
在需要获取锁的线程中,使用lockInterruptibly()方法尝试获取锁。
如果在获取锁的过程中线程被中断(即其他线程调用了该线程的interrupt()方法),那么lockInterruptibly()方法会抛出InterruptedException异常。
在捕获到InterruptedException异常后,你可以执行一些清理工作(如释放已经获取的其他资源),然后结束线程或者重新尝试获取锁。

4.7 wait notify

image-20240409233142780

* 原理之 wait / notify

image-20240323223957890

  • Owner 线程发现条件不满足(调用 wait 方法),即可进入 WaitSet 变为 WAITING 状态
  • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
  • BLOCKED 线程会在 Owner 线程释放锁时唤醒
  • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入 EntryList 重新竞争

Thread#sleep() 方法和 Object#wait() 方法对比

共同点:两者都可以暂停线程的执行。

区别

  • sleep() 方法没有释放锁,而 wait() 方法释放了锁
  • wait() 通常被用于线程间交互/通信,sleep()通常被用于暂停执行。
  • wait() 方法被调用后,线程不会自动苏醒,需要别的线程调用同一个对象上的 notify()或者 notifyAll() 方法。sleep()方法执行完成后,线程会自动苏醒,或者也可以使用 wait(long timeout) 超时后线程会自动苏醒。
  • sleep()Thread 类的静态本地方法,wait() 则是 Object 类的本地方法。为什么这样设计呢?下一个问题就会聊到

Join

在Java中,==Thread 类的 join() 方法用于使当前线程等待,直到调用 join() 方法的线程执行完毕。当在一个线程中调用另一个线程的 join() 方法时,当前线程会暂停执行,直到被 join() 的线程执行完毕,当前线程才会继续执行。

使用 join() 方法的一个常见场景是,当一个线程需要等待另一个线程完成某个任务后再继续执行时。

以下是 join() 方法的基本用法示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class JoinExample {  
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println("Thread is running: " + i);
try {
Thread.sleep(1000); // 模拟线程执行耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});

// 启动线程
thread.start();

// 当前线程等待thread线程执行完毕
thread.join();

// thread线程执行完毕后,当前线程继续执行
System.out.println("Main thread is continuing...");
}
}

在上面的示例中,我们创建了一个新的线程 thread,并在 main 线程中调用了 thread.start() 来启动它。然后,我们在 main 线程中调用了 thread.join(),这会使 main 线程暂停执行,直到 thread 线程执行完毕。当 thread 线程执行完毕后,main 线程会继续执行,并输出 “Main thread is continuing…”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class GuardedObjectV2 {
private Object response;
private final Object lock = new Object();
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
//millis是设置的超时时间
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
log.debug("waitTime: {}", waitTime);
if (waitTime <= 0) {
log.debug("break...");
break;
}
try {
//通过while循环不断设置超时的时间
lock.wait(waitTime); //可能被提前虚假唤醒,但是没有response
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - begin;
log.debug("timePassed: {}, object is null {}",
timePassed, response == null);
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
log.debug("notify...");
lock.notifyAll();
}
}
}

Park和unPark

基本使用

它们是 LockSupport 类中的方法

1
2
3
4
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

先 park 再 unpark

1
2
3
4
5
6
7
8
9
10
11
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
},"t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

输出

1
2
3
4
18:42:52.585 c.TestParkUnpark [t1] - start... 
18:42:53.589 c.TestParkUnpark [t1] - park...
18:42:54.583 c.TestParkUnpark [main] - unpark...
18:42:54.583 c.TestParkUnpark [t1] - resume...

先 unpark 再 park

1
2
3
4
5
6
7
8
9
10
11
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(2);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

输出

1
2
3
4
18:43:50.765 c.TestParkUnpark [t1] - start... 
18:43:51.764 c.TestParkUnpark [main] - unpark...
18:43:52.769 c.TestParkUnpark [t1] - park...
18:43:52.769 c.TestParkUnpark [t1] - resume...

与 Object 的 wait & notify 相比

  • wait,notify 和 notifyAll 必须配合 Object Monitor(必须配合锁使用) 一起使用,而 park,unpark 不
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

$\textcolor{blue}{* 原理之park和unpark}$

每个线程都有自己的一个Parker 对象(由C++编写,java中不可见),由三部分组成 _counter _cond _mutex 打个比喻

  • 线程就像一个旅人,Parker 就像他随身携带的背包,条件变量就好比背包中的帐篷。_counter 就好比背包中 的备用干粮(0 为耗尽,1 为充足)
  • 调用 park 就是要看需不需要停下来歇息
    • 如果备用干粮耗尽,那么钻进帐篷歇息
    • 如果备用干粮充足,那么不需停留,继续前进
  • 调用 unpark,就好比令干粮充足
    • 如果这时线程还在帐篷,就唤醒让他继续前进
    • 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
      • 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮

image-20240328171036713

  1. 当前线程调用 Unsafe.park() 方法
  2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁
  3. 线程进入 _cond 条件变量阻塞
  4. 设置 _counter = 0

image-20220304144548871

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 唤醒 _cond 条件变量中的 Thread_0
  3. Thread_0 恢复运行
  4. 设置 _counter 为 0

image-20220304144626180

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 当前线程调用 Unsafe.park() 方法
  3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
  4. 设置 _counter 为 0

CAS算法

CAS 的全称是 Compare And Swap(比较与交换)用于实现乐观锁,被广泛应用于各大框架中。CAS 的思想很简单,就是用一个预期值要更新的变量值进行比较,两值相等才会进行更新。

CAS 是一个原子操作,底层依赖于一条 CPU 的原子指令。

原子操作 即最小不可拆分的操作,也就是说操作一旦开始,就不能被打断,直到操作完成。

CAS 涉及到三个操作数:

  • V:要更新的变量值(Var)
  • E:预期值(Expected)
  • N:拟写入的新值(New)

当且仅当 V 的值等于 E 时,CAS 通过原子方式用新值 N 来更新 V 的值。如果不等,说明已经有其它线程更新了 V,则当前线程放弃更新。

举一个简单的例子:线程 A 要修改变量 i 的值为 6,i 原值为 1(V = 1,E=1,N=6,假设不存在 ABA 问题)。

  1. i 与 1 进行比较,如果相等, 则说明没被其他线程修改,可以被设置为 6 。
  2. i 与 1 进行比较,如果不相等,则说明被其他线程修改,当前线程放弃更新,CAS 操作失败。

当多个线程同时使用 CAS 操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。

Java 语言并没有直接实现 CAS,CAS 相关的实现是通过** C++ 内联汇编的形式实现的(JNI 调用)**。因此, CAS 的具体实现和操作系统以及 CPU 都有关系。

sun.misc包下的Unsafe类提供了compareAndSwapObjectcompareAndSwapIntcompareAndSwapLong方法来实现的对Objectintlong类型的 CAS 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* CAS
* @param o 包含要修改field的对象
* @param offset 对象中某field的偏移量
* @param expected 期望值
* @param update 更新值
* @return true | false
*/
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object update);

public final native boolean compareAndSwapInt(Object o, long offset, int expected,int update);

public final native boolean compareAndSwapLong(Object o, long offset, long expected, long update);

ABA问题

ABA 问题是 CAS 算法最常见的问题。

如果一个变量 V 初次读取的时候是 A 值,并且在准备赋值的时候检查到它仍然是 A 值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回 A,那 CAS 操作就会误认为它从来没有被修改过。这个问题被称为 CAS 操作的 “ABA”问题。

ABA 问题的解决思路是在变量前面追加上版本号或者时间戳。JDK 1.5 以后的 AtomicStampedReference 类就是用来解决 ABA 问题的,其中的 compareAndSet() 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

对JMM的理解

在 JDK1.2 之前,Java 的内存模型实现总是从主存(即共享内存)读取变量,是不需要进⾏特别 的注意的。⽽在当前的 Java 内存模型下,线程可以把变量保存本地内存(⽐如机器的寄存器) 中,⽽不是直接在主存中进⾏读写。这就可能造成⼀个线程在主存中修改了⼀个变量的值,⽽另 外⼀个线程还继续使⽤它在寄存器中的变量值的拷⻉造成数据的不⼀致

由于计算机硬件底层的内存结构过于复杂,JMM 即 Java Memory Model,他的意义在于避免程序员直接管理计算机底层内存,用一些关键字synchronized、volatile等可以方便的管理内存。

JMM 体现在以下几个方面

  • 原子性 - 保证指令不会受到线程上下文切换的影响 ,就是操作肯定要全部完成的意思,类似于事务
  • 可见性 - 保证指令不会受 cpu 缓存(本地内存)的影响
  • 有序性 - 保证指令不会受 cpu 指令并行优化的影响

ThreadLocal 有什么用?

通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。如果想实现每一个线程都有自己的专属本地变量该如何解决呢?

JDK 中自带的ThreadLocal类正是为了解决这样的问题。 ThreadLocal类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。

如果你创建了一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的本地副本,这也是ThreadLocal变量名的由来。他们可以使用 get()set() 方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。

ThreadLocal 的内部结构

JDK1.7 之前的设计

image.png

JDK1.7 之后的设计

image.png

修改设计后的好处

  1. 每个Map 存储 Entry 的数量变少:就会尽量避免 Hash 冲突了,这样效率就变高了。
  2. 当 Thread 销毁的时候,ThreadLocal 也会随之销毁,减少内存的使用。反观 早先设计,Thread 结束,ThreadLocalMap 依然存在。

关于ThreadLocal的内存泄漏的相关概念

![image.png](https://gitee.com/jacksonzhang1014/blog-image/raw/master/image 6.png)

ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,指向ThreadLocal引用,而 value 是强引用,指向当前线程对应的ThreadLocalMap

(1)内存泄漏相关概念

  • Memory overflow:内存溢出,没有足够的内存给内存的申请者使用。

  • Memory leak:内存泄漏指的是程序中已动态分配的堆内存由于某种原因未释放 或者 无法释放,造成系统内存的浪费,导致程序运行速度减慢甚至系统崩溃等严重后果。内存泄漏的堆积导致内存溢出。

为什么会导致内存泄漏?

比较以上两种情况,我们就会发现,内存泄漏的发生 跟 ThreadLocalMap 当中的key 是强还是弱引用没有关系,而造成真正的内存泄漏的原因如下:

  1. 没有手动删除这个Entry. (table 数组指定位置置为null, 断掉强引用链)

  2. CurrentThread 依然运行。(Current Thread 销毁,则指向 ThreadLocalMap的引用链就断了,进而引发后续链条上全部对象GC)

综上:ThreadLocal 内存泄漏的根源是:由于 ThreadLocalMap 的生命周期跟 Thread 导致的(如果存在线程池,Thread 复用,则这个 ThreadLocalMap 永远不会销毁掉),如果没有手动删除对应的key,就会导致内存泄漏。

(6)为什么弱引用会导致内存泄漏,还是要使用弱引用呢:

事实上,ThreadLocalMap 中的 set/getEntry 方法中,会对 key 为 null (即 ThreadLocal 为null)进行判断,如果为 null 的话,那么 value 也会置为 null,value==null的话,value之前对应的内存就可以被释放

这就意味着 使用完 ThreadLocal, CurrentThread 依然运行的前提下,就算忘记调用 remove 方法,弱引用可以比强引用多一层保障:弱引用的 ThreadLocal 会被回收,对应的value在下一次调用 set、get、remove 中任何一个方法的时候会被清除,从而避免内存泄漏。

volatile VS synchronized

volatile,它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作 volatile 变量都是直接操作主存

synchronized 关键字和 volatile关键字是两个互补的存在,⽽不是对⽴的存在!

  • volatile 关键字是线程同步的轻量级实现,所以 要好。但是volatile性能肯定⽐ synchronized 关键字好。 volatil 关键字只能⽤于变量⽽ synchronized 关键字可以修饰⽅法以及代码 块
  • volatile 关键字能保证数据的可⻅性,但不能保证数据的原⼦性
  • synchronized 关键字两 者都能保证。
  • volatile 关键字主要⽤于解决变量在多个线程之间的可⻅性,⽽ synchronized 关键字解决 的是多个线程之间访问资源的同步性

注意

synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是 synchronized 是属于重量级操作,性能相对更低 。

JMM关于synchronized的两条规定:

  1)线程解锁前,必须把共享变量的最新值刷新到主内存中

  2)线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新获取最新的值

   (注意:加锁与解锁需要是同一把锁)

通过以上两点,可以看到synchronized能够实现可见性。同时,由于synchronized具有同步锁,所以它也具有原子性

如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到 对 run 变量的修改了,想一想为什么?(println方法中有synchronized代码块保证了可见性)

synchronized关键字不能阻止指令重排,但在一定程度上能保证有序性(如果共享变量没有逃逸出同步代码块的话)。因为在单线程的情况下指令重排不影响结果,相当于保障了有序性。

* 原理之 volatile

volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)

  • 对 volatile 变量的写指令后会加入**写屏障 **
  • 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
  • 对 volatile 变量的读指令前会加入读屏障
  • 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据

单例模式

当频繁实例化一个对象并且该对象消耗大量资源或频繁访问共享资源时,单例模式保证一个类仅有一个实例,并提供一个访问它的全局访问点。

单例模式有很多实现方法,饿汉、懒汉、静态内部类、枚举类,试分析每种实现下获取单例对象(即调用 getInstance)时的线程安全,并思考注释中的问题

饿汉式:类加载就会导致该单实例对象被创建

懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建

如何判断这个单例对象有没有被创建,主要就是看INSTANCE有没有被初始化,如果INSTANCE被初始化,我们认为,单例对象已经被创建。并且首次使用对象的时候,会调用getInstance获取实例对象。之后,每次使用对象,也会调用getInstance。

饿汉式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Singleton {  
// 静态初始化器,由JVM保证只初始化一次
private static final Singleton INSTANCE = new Singleton();

// 私有构造函数,防止外部通过new Singleton()创建对象
private Singleton() {}

// 提供静态的getInstance()方法获取单例对象
public static Singleton getInstance() {
return INSTANCE;
}

// 其他方法...
}

懒汉式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

//单例模式,单例模式又称dcl,double checking lock
//Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做 了,直接结束返回
public final class Singleton {
private Singleton() { }
private static volatile Singleton INSTANCE = null;
public static Singleton getInstance() {
// 实例没创建,才会进入内部的 synchronized代码块
if (INSTANCE == null) {
synchronized (Singleton.class) { // t2
// 也许有其它线程已经创建实例,所以再判断一次
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
//错误就是在这里,指令重排可能会导致先执行赋值。在进行创建实例对象,所以必须使用volatile保证有序性
}
}
}
return INSTANCE; //访问静态变量不需要加this
}
}

为什么不直接加sync锁,而要在之前使用if(instance == null)先判断?因为,我们每次对象的时候,都会调用getInstance方法,那么我们每次都要去获取锁,这样的开销是很大的。

那为什么加完sync锁之后,还要加if(instance == null)进行判断?因为有可能大量线程同时进入了第一个if语句,但是只能一个进去,其它的都被阻塞在了sync锁,不过只是阻塞,当一个线程新建完线程之后,那些阻塞的线程还是可以进去sync代码块,这就有可能又新建对象了

3.静态内部类:利用JVM的类加载机制来保证初始化instance时只有一个线程。

1
2
3
4
5
6
7
8
9
10
11
public class Singleton {  
private Singleton() {}

private static class SingletonHolder {
private static final Singleton INSTANCE = new Singleton();
}

public static Singleton getInstance() {
return SingletonHolder.INSTANCE;
}
}

4.*枚举**:在Java中,枚举类型也是线程安全的单例实现方式。

1
2
3
4
5
public enum Singleton {  
INSTANCE;

// 其他方法...
}

介绍⼀下 Atomic 原⼦类

Atomic 翻译成中⽂是原⼦的意思。Atomic 是指⼀个操作是不可中断的。即使是在多个线程⼀ 起执⾏的时候,⼀个操作⼀旦开始,就不会被其他线程⼲扰。 并发包 java.util.concurrent的原⼦类都存放在 java.util.concurrent.atomic 下,实际上是采用CAS的方式如下图所示

  • 基本类型

使⽤原⼦的⽅式更新基本类型

AtomicInteger :整形原⼦类

AtomicLong :⻓整型原⼦类

AtomicBoolean :布尔型原⼦

  • 数组类型

使⽤原⼦的⽅式更新数组⾥的某个元素

AtomicIntegerArray :整形数组原⼦类

AtomicLongArray :⻓整形数组原⼦类

AtomicReferenceArray :引⽤类型数组原⼦

  • 引⽤类型

AtomicReference :引⽤类型原⼦类

AtomicStampedReference :原⼦更新带有版本号的引⽤类型。该类将整数值与引⽤关联起 来,可⽤于解决原⼦的更新数据和数据的版本号,可以解决使⽤ CAS 进⾏原⼦更新时可能 出现的 ABA 问题。

AtomicMarkableReference :原⼦更新带有标记位的引⽤类

对象的属性修改类型

AtomicIntegerFieldUpdater :原⼦更新整形字段的更新器

AtomicLongFieldUpdater :原⼦更新⻓整形字段的更新器

AtomicReferenceFieldUpdater :原⼦更新引⽤类型字段的更新

线程池的使用

为什么要使用线程池?

  • 降低资源消耗。通过重复利⽤已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能⽴即执⾏。

  • 提高线程的可管理性。线程是稀缺资源,如果⽆限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使⽤线程池可以进⾏统⼀的分配,调优和监控

实现 Runnable 接⼝和 Callable 接⼝的区别

Runnable ⾃ Java 1.0 以来⼀直存在,但 Callable 仅在 Java 1.5 中引⼊,⽬的就是为了来处 理Runna ble 不⽀持的⽤例。 Runnable 接⼝不会返回结果或抛出检查异常,但是 Callable可以。所以,如果任务不需要返回结果或抛出异常推荐使⽤Runnable 接⼝,这样代码看起来会 更加简洁

执⾏ execute()⽅法和 submit()⽅法的区别是什么呢?

  1. **execute() ⽅法⽤于提交不需要返回值的任务,所以⽆法判断任务是否被线程池执⾏成功与 否; **
  2. submit() ⽅法⽤于提交需要返回值的任务。线程池会返回⼀个 这个 Future 类型的对象,通过 Future 对象可以判断任务是否执⾏成功,并且可以通过 Futureget() ⽅法来获取 返回值,get()⽅法会阻塞当前线程直到任务完成,⽽使⽤get (long timeout TimeUnit )⽅法则会阻塞当前线程⼀段时间后⽴即返回,这时候有可能任务没有执⾏完

如何使用线程池

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。

  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排 队,直到有空闲的线程。

  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。

  • 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它 著名框架也提供了实现

  • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略

  • CallerRunsPolicy 让调用者运行任务

  • DiscardPolicy 放弃本次任务

  • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

  • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题

  • Netty 的实现,是创建一个新线程来执行任务

  • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略

  • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

  • 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。

我先介绍一下线程池的基本概念。线程池中,有两个线程概念。一种是核心线程,一种是救急线程。核心线程的数量加上救急线程的数量等价于该线程池的最大线程数量,在线程池当中还有一个阻塞队列。一般来说,当任务加进来的时候,会先使用核心线程当中的线程,如果核心线程中的线程不够用了,会先把任务放进阻塞队列当中,如果阻塞队列也放满了(注意我这边的阻塞队列选的是有界的),那么就会使用救急线程。如果救急线程也放满了,那么就会采用拒绝策略。一共有四种拒绝策略….

  • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
  • CallerRunsPolicy 让调用者运行任务 (它的意思是当线程池中的线程都已被占用,并且等待队列也满了时,新的任务将由调用者所在的线程来执行,而不是直接丢弃任务或抛出异常。)
  • DiscardPolicy 放弃本次任务
  • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

《阿⾥巴巴 Java 开发⼿册》中强制线程池不允许使⽤ Executors 去创建,⽽是通过
ThreadPoolExecutor 的⽅式,这样的处理⽅式让写的同学更加明确线程池的运⾏规则,规避资源
耗尽的⻛险

1. 使用Executors工厂类

通过 Executor 框架的⼯具类 Executors 来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());
});
}

executor.shutdown(); // 记住要关闭线程池
}
}

2.使用ThreadPoolExcutor线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.BlockingQueue;  
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorExample {
public static void main(String[] args) {
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 60L;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100);

ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue
);

for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());
});
}

executor.shutdown(); // 记住要关闭线程池
}
}

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 核心线程,任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数 :

  • keepAliveTime:线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁。(救急线程的回收时间)
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :executor 创建新线程的时候会用到。
  • handler :饱和策略(后面会单独详细介绍一下)。

线程池的阻塞队列

新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

不同的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。

  • ArrayBlockingQueuerp

    • 基于数组结构的有界阻塞队列。
    • 当队列为空时,尝试获取元素的线程会被阻塞,直到有元素可获取。
    • 当队列已满时,尝试添加元素的线程也会被阻塞,直到队列有空余空间。
    • 它按照先进先出(FIFO)的原则对元素进行排序。
  • 容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列):FixedThreadPoolSingleThreadExectorFixedThreadPool最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExector只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。

  • SynchronousQueue(同步队列):CachedThreadPoolSynchronousQueue 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,CachedThreadPool 的最大线程数是 Integer.MAX_VALUE ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。

  • DelayedWorkQueue(延迟阻塞队列):ScheduledThreadPoolSingleThreadScheduledExecutorDelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。

有以下三种线程池(阿里巴巴不让用)

1.newFixedThreadPool

该方法放回一个固定数量的线程

  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务

评价 适用于任务量已知,相对耗时的任务

2.CachedThreadPool

特点

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,
    • 意味着全部都是救急线程(60s 后可以回收)
    • 救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)

评价 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线 程。 适合任务数比较密集,但每个任务执行时间较短的情况

3.SingleThreadExecutor

希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程 也不会被释放。

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一 个线程,保证池的正常工作
  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
    • FinalizableDelegatedExecutorService 应用的是装饰器模式,在调用构造方法时将ThreadPoolExecutor对象传给了内部的ExecutorService接口。只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法,也不能重新设置线程池的大小。
  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
    • 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
  • 即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。

image-20240410211122310

  1. FixedThreadPool:固定大小的线程池。

    1
    2
    3
    4
    ExecutorService executor = Executors.newFixedThreadPool(int nThreads);  
    executor.execute(new RunnableTask()); // 提交任务
    // ...
    executor.shutdown(); // 关闭线程池
  2. CachedThreadPool:可缓存的线程池,如果线程池当前线程数小于核心线程数,则创建新线程执行任务。当线程数大于核心线程数时,缓存空闲线程。当线程数大于最大线程数时,则通过队列缓存新任务。

    1
    2
    3
    4
    ExecutorService executor = Executors.newCachedThreadPool();  
    executor.execute(new RunnableTask()); // 提交任务
    // ...
    executor.shutdown(); // 关闭线程池

    注意CachedThreadPool没有设置线程数的上限(实际上是Integer.MAX_VALUE),可能会导致创建大量线程,耗尽系统资源。

  3. SingleThreadExecutor:单线程的线程池,它用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)一个接一个地执行。

    1
    2
    3
    4
    ExecutorService executor = Executors.newSingleThreadExecutor();  
    executor.execute(new RunnableTask()); // 提交任务
    // ...
    executor.shutdown(); // 关闭线程池

使用SpringBoot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class ThreadPoolConfig {

@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(5);
// 设置最大线程数
executor.setMaxPoolSize(10);
// 设置队列容量
executor.setQueueCapacity(25);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
// 设置线程名前缀
executor.setThreadNamePrefix("taskExecutor-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程池
executor.initialize();
return executor;
}
}

如何根据任务的优先级来执行线程池

这是一个常见的面试问题,本质其实还是在考察求职者对于线程池以及阻塞队列的掌握。

我们上面也提到了,不同的线程池会选用不同的阻塞队列作为任务队列,比如FixedThreadPool 使用的是LinkedBlockingQueue(无界队列),由于队列永远不会被放满,因此FixedThreadPool最多只能创建核心线程数的线程。

假如我们需要实现一个优先级任务线程池的话,那可以考虑使用 PriorityBlockingQueue (优先级阻塞队列)作为任务队列(ThreadPoolExecutor 的构造函数有一个 workQueue 参数可以传入任务队列)。

ThreadPoolExecutor构造函数

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,可以看作是线程安全的 PriorityQueue,两者底层都是使用小顶堆形式的二叉堆,即值最小的元素优先出队。不过,PriorityQueue 不支持阻塞操作。

要想让 PriorityBlockingQueue 实现对任务的排序,传入其中的任务必须是具备排序能力的,方式有两种:

  1. 提交到线程池的任务实现 Comparable 接口,并重写 compareTo 方法来指定任务之间的优先级比较规则。
  2. 创建 PriorityBlockingQueue 时传入一个 Comparator 对象来指定任务之间的排序规则(推荐)。

不过,这存在一些风险和问题,比如:

  • PriorityBlockingQueue 是无界的,可能堆积大量的请求,从而导致 OOM。
  • 可能会导致饥饿问题,即低优先级的任务长时间得不到执行。
  • 由于需要对队列中的元素进行排序操作以及保证线程安全(并发控制采用的是可重入锁 ReentrantLock),因此会降低性能。

对于 OOM 这个问题的解决比较简单粗暴,就是继承PriorityBlockingQueue 并重写一下 offer 方法(入队)的逻辑,当插入的元素数量超过指定值就返回 false 。

饥饿问题这个可以通过优化设计来解决(比较麻烦),比如等待时间过长的任务会被移除并重新添加到队列中,但是优先级会被提升。

对于性能方面的影响,是没办法

AQS

概念

AQS 的全称为( AbstractQueuedSynchronizer ),是阻塞式锁和相关的同步器工具的框架 ,这个类在 java.util.concurrent.locks 包下⾯。

原理:AQS 核⼼思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的⼯作线 程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占⽤,那么就需要⼀套线程阻塞 等待以及被唤醒时锁分配的机制,这个机制 AQS 是⽤ CLH 队列锁实现的,即将暂时获取不到锁 的线程加⼊到队列中

CLH 好处:

  • 无锁,使用自旋
  • 快速,无阻塞

AQS 对资源的共享⽅式

  • Exclusive(独占):只有⼀个线程能执⾏,如 ReentrantLock 。⼜可分为公平锁和非公平锁:
    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
    • ⾮公平锁:当线程要获取锁时,⽆视队列顺序直接去抢锁,谁抢到就是谁的
  • Share(共享):多个线程可同时执⾏,如 CountDownLatch SemaphoreCountDownLatchCyclicBarrierReadWriteLock 我们都会在后⾯讲到。

ReentrantReadWriteLock课以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多 个线程同时对某⼀资源进⾏读。

  • 不同的⾃定义同步器争⽤共享资源的⽅式也不同。⾃定义同步器在实现时只需要实现共享资源 state 的获取与释放⽅式即可,⾄于具体线程等待队列的维护(如获取资源失败⼊队/唤醒出队 等),AQS 已经在顶层实现好了

AQS 使用 int 成员变量 state 表示同步状态,通过内置的 线程等待队列 来完成获取资源线程的排队工作。

state 变量由 volatile 修饰,用于展示当前临界资源的获锁情况。

1
2
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;

另外,状态信息 state 可以通过 protected 类型的getState()setState()compareAndSetState() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。


1
2
3
4
5
6
7
8
9
10
11
12
13
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}


ReentrantLock 为例,state 初始值为 0,表示未锁定状态。A 线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state+1 。此后,其他线程再 tryAcquire() 时就会失败,直到 A 线程 unlock()state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。

再以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后countDown() 一次,state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 state=0 ),会 unpark() 主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作。

AQS组件

ReentrantLock

ReentrantLock 实现了 Lock 接口,是一个可重入且独占式的锁,和 synchronized 关键字类似。不过,ReentrantLock 更灵活、更强大,增加了轮询、超时、中断、公平锁和非公平锁等高级功能。

1
public class ReentrantLock implements Lock, java.io.Serializable {}

ReentrantLock 里面有一个内部类 SyncSync 继承 AQS(AbstractQueuedSynchronizer),添加锁和释放锁的大部分操作实际上都是在 Sync 中实现的。Sync 有公平锁 FairSync 和非公平锁 NonfairSync 两个子类。

img

ReentrantLock 默认使用非公平锁,也可以通过构造器来显式的指定使用公平锁。

1
2
3
4
5
// 传入一个 boolean 值,true 时为公平锁,false 时为非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

从上面的内容可以看出, ReentrantLock 的底层就是由 AQS 来实现的。关于 AQS 的相关内容推荐阅读 AQS 详解open in new window 这篇文章。

公平锁和非公平锁有什么区别?

  • 公平锁 : 锁被释放之后,先申请的线程先得到锁。性能较差一些,因为公平锁为了保证时间上的绝对顺序,上下文切换更频繁。
  • 非公平锁:锁被释放之后,后申请的线程可能会先获取到锁,是随机或者按照其他优先级排序的。性能更好,但可能会导致某些线程永远无法获取到锁。

synchronized 和 ReentrantLock 有什么区别?

两者都是可重入锁

synchronized 依赖于 JVM 而 ReentrantLock 依赖于 API

synchronized 是依赖于 JVM 实现的,前面我们也讲到了 虚拟机团队在 JDK1.6 为 synchronized 关键字进行了很多优化,但是这些优化都是在虚拟机层面实现的,并没有直接暴露给我们。

ReentrantLock 是 JDK 层面实现的(也就是 API 层面,需要 lock() 和 unlock() 方法配合 try/finally 语句块来完成),所以我们可以通过查看它的源代码,来看它是如何实现的。

ReentrantLock 比 synchronized 增加了一些高级功能

相比synchronizedReentrantLock增加了一些高级功能。主要来说主要有三点:

  • 等待可中断 : ReentrantLock提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly() 来实现这个机制。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。
  • 可实现公平锁 : ReentrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。ReentrantLock默认情况是非公平的,可以通过 ReentrantLock类的ReentrantLock(boolean fair)构造方法来指定是否是公平的。
  • 可实现选择性通知(锁可以绑定多个条件): synchronized关键字与wait()notify()/notifyAll()方法相结合可以实现等待/通知机制。ReentrantLock类当然也可以实现,但是需要借助于Condition接口与newCondition()方法。

如果你想使用上述功能,那么选择 ReentrantLock 是一个不错的选择。

  • 可中断锁:获取锁的过程中可以被中断,不需要一直等到获取锁之后 才能进行其他逻辑处理。ReentrantLock 就属于是可中断锁。
  • 不可中断锁:一旦线程申请了锁,就只能等到拿到锁以后才能进行其他的逻辑处理。 synchronized 就属于是不可中断锁。

可打断

可打断指的是处于阻塞状态等待锁的线程可以被打断等待。注意lock.lockInterruptibly()lock.trylock()方法是可打断的,lock.lock()不是。可打断的意义在于避免得不到锁的线程无限制地等待下去,防止死锁的一种方式。

使用这个 t1.interrupt();进行打断

条件变量

synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待

ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤 醒
使用要点
  • await 前需要获得锁
  • await 执行后,会释放锁,进入 conditionObject 等待
  • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
  • 竞争 lock 锁成功后,从 await 后继续执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//例子
static ReentrantLock lock = new ReentrantLock();

//条件变零
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();

static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}

Semaphore

synchronizedReentrantLock 都是一次只允许一个线程访问某个资源,而Semaphore(信号量)可以用来控制同时访问特定资源的线程数量。

Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 Semaphore 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。

1
2
3
4
5
6
7
// 初始共享资源数量
final Semaphore semaphore = new Semaphore(5);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();

当初始的资源个数为 1 的时候,Semaphore 退化为排他锁。

Semaphore 有两种模式:。

  • 公平模式: 调用 acquire() 方法的顺序就是获取许可证的顺序,遵循 FIFO;
  • 非公平模式: 抢占式的。

Semaphore 对应的两个构造方法如下:

这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。

Semaphore 通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)。

CountDownLatch

CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

如何使用CountDownLatch

CountDownLatch 的作用就是 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。之前在项目中,有一个使用多线程读取多个文件处理的场景,我用到了 CountDownLatch 。具体场景是下面这样的:

我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。

为此我们定义了一个线程池和 count 为 6 的CountDownLatch对象 。使用线程池处理读取任务,每一个线程处理完之后就将 count-1,调用CountDownLatch对象的 await()方法,直到所有文件读取完之后,才会接着执行后面的逻辑。

伪代码是下面这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CountDownLatchExample1 {
// 处理文件的数量
private static final int threadCount = 6;

public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(推荐使用构造方法创建)
ExecutorService threadPool = Executors.newFixedThreadPool(10);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {
try {
//处理文件的业务操作
//......
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//表示一个文件已经被完成
countDownLatch.countDown();
}

});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}
}


有没有可以改进的地方呢?

可以使用 CompletableFuture 类来改进!Java8 的 CompletableFuture 提供了很多对多线程友好的方法,使用它可以很方便地为我们编写多线程程序,什么异步、串行、并行或者等待所有线程执行完任务什么的都非常方便。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CompletableFuture<Void> task1 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> task6 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);

try {
headerFuture.join();
} catch (Exception ex) {
//......
}
System.out.println("all done. ");

上面的代码还可以继续优化,当任务过多的时候,把每一个 task 都列出来不太现实,可以考虑通过循环来添加任务。

1
2
3
4
5
6
7
8
9
10
11
//文件夹位置
List<String> filePaths = Arrays.asList(...)
// 异步处理所有文件
List<CompletableFuture<String>> fileFutures = filePaths.stream()
.map(filePath -> doSomeThing(filePath))
.collect(Collectors.toList());
// 将他们合并起来
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
fileFutures.toArray(new CompletableFuture[fileFutures.size()])
);

Future

Future 类是异步思想的典型运用,主要用在一些需要执行耗时任务的场景,避免程序一直原地等待耗时任务执行完成,执行效率太低。具体来说是这样的:当我们执行某一耗时的任务时,可以将这个耗时任务交给一个子线程去异步执行,同时我们可以干点其他事情,不用傻傻等待耗时任务执行完成。等我们的事情干完后,我们再通过 Future 类获取到耗时任务的执行结果。这样一来,程序的执行效率就明显提高了。

这其实就是多线程中经典的 Future 模式,你可以将其看作是一种设计模式,核心思想是异步调用,主要用在多线程领域,并非 Java 语言独有。

在 Java 中,Future 类只是一个泛型接口,位于 java.util.concurrent 包下,其中定义了 5 个方法,主要包括下面这 4 个功能:

  • 取消任务;
  • 判断任务是否被取消;
  • 判断任务是否已经执行完成;
  • 获取任务执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// V 代表了Future执行的任务返回值的类型
public interface Future<V> {
// 取消任务执行
// 成功取消返回 true,否则返回 false
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否被取消
boolean isCancelled();
// 判断任务是否已经执行完成
boolean isDone();
// 获取任务执行结果
V get() throws InterruptedException, ExecutionException;
// 指定时间内没有返回计算结果就抛出 TimeOutException 异常
V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutExceptio

}
简单理解就是:我有一个任务,提交给了 Future 来处理。任务执行期间我自己可以去做任何想做的事情。并且,在这期间我还可以取消任务以及获取任务的执行状态。一段时间之后,我就可以 Future 那里直接取出任务执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

//可以使用 CompletableFuture 类来改进!Java8 的 CompletableFuture 提供了很多对多线程友好的方法,使用它可以很方便地为我们编写多线程程序,什么异步、串行、并行或者等待所有线程执行完任务什么的都非常方便。

CompletableFuture<Void> task1 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> task6 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);

try {
headerFuture.join();
} catch (Exception ex) {
//......
}
System.out.println("all done. ");

CyclicBarrier

CyclicBarrierCountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。

CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。**他比CountDownLatch多了一个可以重用的特点**

线程之间资源共享,怎么保证线程安全

  1. 同步机制
    • 锁(Lock):使用内置的锁(如synchronized关键字)或显式锁(如ReentrantLock)来确保同一时间只有一个线程可以访问共享资源。当一个线程获得锁时,其他试图访问该资源的线程将被阻塞,直到锁被释放。
    • 条件变量(Condition):与锁一起使用,允许线程等待某个条件成立后再继续执行。这有助于实现更复杂的同步逻辑。
  2. 原子操作
    • 使用java.util.concurrent.atomic包中的原子类(如AtomicIntegerAtomicLong等)来执行不可中断的操作,这些操作在多线程环境下是安全的。
  3. 并发集合
    • 使用Java并发包(java.util.concurrent)中提供的线程安全集合,如ConcurrentHashMapCopyOnWriteArrayList等。这些集合内部已经实现了必要的同步机制,可以在多线程环境下安全地使用。

4.使用线程安全的队列

  • 当需要在多个线程之间传递数据时,可以使用线程安全的队列(如BlockingQueue)来确保数据的正确传递和处理。
  1. 使用并发工具类
    • Java并发包中提供了许多有用的并发工具类,如CountDownLatchCyclicBarrierSemaphore等。这些工具类可以帮助你更容易地实现复杂的同步逻辑和并发控制。

5。使用一下线程安全的类

1. 集合类

  • Vector:是一个动态数组,实现了List接口,是线程安全的。其所有操作方法都是同步的,因此多个线程可以安全地同时对其进行读写操作。

  • Hashtable:是一个散列表,实现了Map接口,也是线程安全的。其所有操作方法都是同步的。

  • Stack:是一个栈,继承自Vector,因此也是线程安全的。其push、pop和peek等操作方法都是同步的。

  • ConcurrentHashMap:是Java 5引入的线程安全的哈希表实现,它使用了分段锁的机制,将数据分成多个段,每个段上都有一个锁,不同的线程可以同时访问不同的段,提高了并发性能。

  • CopyOnWriteArrayList:是Java 5引入的线程安全的动态数组实现。其在进行写操作时,会创建一个新的数组来进行修改,从而保证了写操作的线程安全性。读取操作则可以并发进行,不需要加锁。

  • CopyOnWriteArraySet:其内部实现是在其类内部声明一个final的CopyOnWriteArrayList属性,并在调用其构造函数时实例化该CopyOnWriteArrayList。

  • BlockingQueue:是一个接口,用于实现线程安全的阻塞队列。常用的实现类有ArrayBlockingQueue、LinkedBlockingQueue和PriorityBlockingQueue等。

  • ConcurrentLinkedQueue:是线程安全的链表实现队列,它使用了无锁算法(CAS)来实现并发操作。

  • StringBuffer

2. 原子类

  • AtomicInteger, AtomicLong 等:这些类提供了原子性操作,确保对整数类型的操作是线程安全的。

乐观锁和悲观锁的区别

悲观锁假定会发⽣冲突,访问的时候都要先获得锁,保证同⼀个 时刻只有线程获得锁,读读也会阻塞;

乐观锁假设不会发⽣冲突,只有在提交操作的时候检 查是否有冲突)

在Java中,线程之间的通信是多线程编程中至关重要的一部分,它确保多个线程能够协调、协作地完成任务。Java提供了多种机制来实现线程间的通信,以下是其中一些主要机制:

  1. 共享变量与同步机制

    • volatile关键字:用于修饰共享变量,确保线程对变量的修改对其他线程立即可见,从而消除指令重排序带来的影响。它适用于简单的状态标记等场景。

    • synchronized关键字

      :用于实现临界区同步,通过锁定特定对象,确保同一时刻只有一个线程能够访问共享资源。在synchronized代码块或方法中,可以安全地修改共享变量。同时,结合wait(),notify(),notifyAll()

      方法,可以实现线程间的通信。

      • wait():使当前线程释放锁并进入等待状态,直到被其他线程调用notify()notifyAll()方法唤醒。
      • notify():唤醒一个因调用wait()而处于等待状态的线程。
      • notifyAll():唤醒所有因调用wait()而处于等待状态的线程。
  2. 消息传递

    • 通过Java中的消息队列(如BlockingQueue)或其他数据结构(如HashMap、LinkedList等)来传递消息或数据。一个线程可以将消息放入队列或数据结构中,另一个线程可以从队列或数据结构中取出消息进行处理。
  3. 其他通信方式

    • wait/notify/notifyAll:结合synchronized关键字使用,实现线程间的等待和通知。
    • Lock和Condition:Java 5引入的新的线程同步机制,提供了更灵活的锁和条件变量功能。
    • CyclicBarrier:允许多个线程互相等待,直到所有线程都到达某个公共屏障点(barrier point)。
    • CountDownLatch:一个同步辅助类,允许一个或多个线程等待直到在其他线程中的一组操作完成。
    • Semaphore:一个计数信号量,用于控制同时访问某个资源或某个代码块的线程数量。

总的来说,Java中的线程通信涉及到共享变量的同步访问、消息的传递以及使用各种同步辅助类来实现线程的协调与同步。这些机制确保了多线程程序中的线程能够正确地交互和协作,从而避免了数据不一致、竞态条件等问题。