JUC系列:(五)无锁

布鸽不鸽 Lv4

CAS

CAS (Compare-And-Swap),是CPU并发原语

在Java中:

  • 调用UnSafe类中的CAS 方法,JVM会实现出CAS汇编指令,这是一种完全依赖于硬件的功能,实现了原子操作

CAS作用:

  • 比较当前工作内存中的值和主物理内存中的值,如果相同则执行规定操作

CAS特点:

  • CAS是基于乐观锁的思想

  • 无锁并发,无阻塞并发(线程不会陷入阻塞,线程不需要频繁切换上下文)

CAS缺点:

  • 执行的是循环操作,可能无限循环导致饥饿(使用CAS线程数最好不要超过CPU核心数)

  • 只能同时保证一个变量的原子操作

  • 引出ABA问题

Atomic

常用API

常见原子类:AtomicInteger、AtomicBoolean、AtomicLong

构造方法:

  • public AtomicInteger():初始化一个默认值为 0 的原子型 Integer
  • public AtomicInteger(int initialValue):初始化一个指定值的原子型 Integer

常用API:

方法作用
public final int get()获取 AtomicInteger 的值
public final int getAndIncrement()以原子方式将当前值加 1,返回的是自增前的值
public final int incrementAndGet()以原子方式将当前值加 1,返回的是自增后的值
public final int getAndSet(int value)以原子方式设置为 new Value 的值,返回旧值
public final int addAndGet(int data)以原子方式将输入的数值与实例中的值相加并返回
实例:AtomicInteger 里的 value

原理分析

AtomicInteger 原理

自旋锁+CAS算法(有3个操作数:内存值V, 旧的预期值A,要修改的值B)

getAndSet方法:

1
2
3
4
5
6
7
8
public final int getAndSet(int newValue) {
/**
* this: 当前对象
* valueOffset: 内存偏移量,内存地址
* valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
*/
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
1
2
3
4
5
6
7
8
9
10
11
// val1: AtomicInteger对象本身,var2: 该对象值的引用地址,var4: 需要变动的数
public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
// 用this和valueOffset找到内存中真实的值
var5 = this.getIntVolatile(var1, var2);

} while(!this.compareAndSwapInt(var1, var2, var5, var4)); // 使用CAS

return var5;
}

getAndSet方法:

1
2
AtomicInteger a = new AtomicInteger();
a.getAndUpdate(i -> i + 10);
1
2
3
4
5
6
7
8
public final int getAndUpdate(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get(); // 内存真实值
next = updateFunction.applyAsInt(prev); // 期望更新到的值
} while (!compareAndSet(prev, next)); // 自旋
return prev;
}

compareAndSet方法:

1
2
3
4
5
6
7
8
9
public final boolean compareAndSet(int expect, int update) {
/**
* this: 当前对象
* valueOffset: 内存偏移量,内存地址
* expect: 期望的值
* update: 更新的值
*/
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

原子引用

对Object进行原子操作

原子引用类:AtomicReferenceAtomicStampedReferenceAtomicMarkableReference

AtomicReference类:

  • 构造方法:AtomicReference<T> atomicReference = new AtomicReference<T>()

  • 常用 API:

    • public final boolean compareAndSet(V expectedValue, V newValue):CAS操作,返回是否成功
    • public final void set(V newValue):将值设置为newValue
    • public final V get():返回当前值
1
2
3
4
5
6
7
8
9
10
11
12
13
// 需要自行实现自旋
Student s1 = new Student(33, "z3");

AtomicReference<Student> atomicReference = new AtomicReference<>();
atomicReference.set(s1);

while (true) {
Student s2 = new Student(44, "l4");
if (atomicReference.compareAndSet(s1, s2)) {
break;
}
}
System.out.println(atomicReference.get());

原子数组

原子数组类:AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray

AtomicIntegerArray类方法:

1
2
3
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

原子更新器

原子更新器类:AtomicReferenceFieldUpdaterAtomicIntegerFieldUpdaterAtomicLongFieldUpdater

  • 对象的某个域(Field)进行原子操作

  • 只能配合volatile修饰的字段使用,否则会出现异常 IllegalArgumentException: Must be volatile type

常用API:

  • static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> c, String fieldName):构造方法
  • abstract boolean compareAndSet(T obj, int expect, int update):CAS方法
1
2
3
4
5
AtomicIntegerFieldUpdater<Student> fieldUpdater = 
AtomicIntegerFieldUpdater.newUpdater(Student.class, "id");

Student s1 = new Student(0, "z3");
boolean b = fieldUpdater.compareAndSet(s1, 0, 33);

原子累加器

原子累加器类:LongAdderDoubleAdderLongAccumulatorDoubleAccumulator

LongAdder和LongAccumulator区别:

  • 相同点:
    • 都是CAS实现的
    • 当accumulatorFunction等于null,初始值等于0时,LongAccumulator等于LongAdder
  • 不同点:
    • LongAdder是LongAccumulator的特例
    • LongAccumulator可以自定义累加规则,初始值
1
2
3
4
5
6
7
LongAdder longAdder = new LongAdder();

for (int i = 0; i < 1000; i++) {
new Thread(() -> {
longAdder.increment();
}).start();
}
1
2
3
4
5
6
7
8
// 参数依次是:原值,累加值,初始值
LongAccumulator longAccumulator = new LongAccumulator((left, right) -> left + right, 0);

for (int i = 0; i < 1000; i++) {
new Thread(() -> {
longAccumulator.accumulate(1);
}).start();
}

Adder

优化机制

LongAdder是JDK8提供的类,跟AtomicLong有相同的效果,但对CAS机制进行了优化

  • CAS缺点:大量空循环,自旋转

  • 优化思路:数据分离,将单点得更新压力分担到各个节点上,空间换时间

分段CAS机制:

  • 发生竞争时:创建Cell数组,将不同线程的操作离散到不同节点上(通过hash等算法映射)
  • 设置多个累加单元,比如Thread-0累加Cell[0],Thread-1累加Cell[1]…最后将结果汇总
  • 累加时操作不同的Cell变量,减少了CAS失败,从而提升了性能

自动分段迁移机制:

  • 某个Cell的value执行CAS失败,自动寻找另一个Cell分段内的value值进行CAS

伪共享

Cell为累加的单元,是数组的形式

数组访问的索引是通过Thread里的threadLocalRandomProbe域取模实现的(这个域是ThreadLocalRandom更新的)

下面是Cell的部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Striped64类中
@sun.misc.Contended static final class Cell {
volatile long value;

Cell(long x) {
value = x;
}

final boolean cas(long cmp, long val) {
// 使用CAS方式累加,cmp为旧值,val为新值
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// ...
}

@sun.misc.Contended注解的作用:

Cell是数组的形式,在内存中是连续的。64位系统中,一个Cell为24字节(16字节的对象头,8字节的value)。一个cache line是64字节的,因此缓存可以存下两个Cell对象。当Core-0要修改Cell[0],Core-1要修改Cell[1]时,无论谁成功都会导致当前缓存行失效,进而数据失效,需要从主存获取,影响效率

@sun.misc.Contended:在对象或字段的前后各增加128字节大小的padding,让CPU将对象预读至缓存时占用不同的缓存行,这样就不会造成对方缓存行的失效

源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// LongAdder继承于Striped64
abstract class Striped64 extends Number {

// 当前CPU数量
static final int NCPU = Runtime.getRuntime().availableProcessors();

// 累加单元数组
transient volatile Cell[] cells;

// 基础值。如果没有发生竞争,会累加这个域。
transient volatile long base;

// 初始化或者扩容时,只能有一个线程执行,通过CAS设置cellsBusy为1来实现加锁
transient volatile int cellsBusy;
}

工作流程:

  • cells占用内存大,所以是惰性加载的。如果无竞争,会直接累加base。
  • 第一次竞争发生时,创建大小为2的cells数组,将当前base值包装为Cell对象,放入映射的槽位上
  • 分段迁移后,还发生竞争时,扩容cells数组长度为原来的2倍,然后rehash。
    • 数组总长度是2的幂,默认为最大CPU数量(比如:6核心CPU最大为8槽位)
  • 分段累加的过程中,如果当前线程对应的cells槽位为空,会新建Cell填充
    • 如果出现竞争,重新计算线程对应的槽位,继续自旋尝试修改

LongAdder#add方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void add(long x) {

Cell[] as;
long b, v;
int m;
Cell a;

// cells不为空,说明cells已被初始化
// 进入||后的逻辑,去更新base域,失败表示发生竞争
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 多线程写base发生竞争,使用cells来写

// 表示cell没有竞争
boolean uncontended = true;

// 条件一:cells未初始化
// 条件二:对应槽位尚未初始化
// 条件三:当前槽位CAS有竞争
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x))) {
// 上述条件满足任何一条后,进入以下逻辑

// uncontended在对应的cell上累加失败的时候才为false,其余情况均为true
longAccumulate(x, null, uncontended);
}
}
}
1
2
3
4
// Striped64类的方法
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// Striped64类的方法
// 当前值:x, null, false|true
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 当前线程还没有对应cell,随机生成一个hash值,将当前线程绑定到一个cell
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}

// 表示扩容意向,false一定不会扩容,true可能扩容
boolean collide = false;
for (;;) {
Cell[] as; // cells引用
Cell a; // 当前线程Cell
int n; // cells长度
long v; // 期望值

// 【case1】如果cells已经初始化,应将数据写入对应的cell中
if ((as = cells) != null && (n = as.length) > 0) {
// 【case1.1】当前下标的cell为null,需要初始化
if ((a = as[(n - 1) & h]) == null) {
// 是否被锁(通过cellsBusy)
if (cellsBusy == 0) {
// 创建cell,初始值为x
Cell r = new Cell(x);
// 加锁
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 解锁
cellsBusy = 0;
}
if (created) // 创建完成,可以退出循环了
break;
continue;
}
}
collide = false;
}
// 【case1.2】线程对应的cell有竞争,改变对应的cell来重试
else if (!wasUncontended)
wasUncontended = true;
// 【case1.3】当前线程rehash过,新命中的cell不为空,尝试累加
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
// 【case1.4】cells长度超长或已经扩容过了
else if (n >= NCPU || cells != as)
collide = false; // 扩容意向变为false
// 【case1.5】更改扩容意向为true,上一行判断如果进入了,永远不会进入这一条判断的
else if (!collide)
collide = true;
// 【case1.6】逻辑扩容,进行加锁
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1]; // 扩容为以前的两倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false; // 扩容意向改为false,表示不扩容了
continue;
}
h = advanceProbe(h); // 重置当前线程hash值
}
//【case2】cells还没初始化,判断有没有加锁,没有就用CAS加锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//【case3】说明其他线程正在初始化cells,当前线程值将累加到base,累加成功结束自旋
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
}
}

LongAdder#sum方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
public long sum() {
Cell[] as = cells;
Cell a;
long sum = base;
if (as != null) {
// 遍历cells,累加
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

ABA

什么是ABA问题:

  • (A)一个线程先读取共享内存数据值A,随后因某种原因,线程暂时挂起
  • (B)另一个线程临时将共享内存数据值先改为B,随后又改回为A
  • (A)并通过CAS比较,最终比较结果将会无变化

解决方法,用版本号:

  • AtomicStampedReference内部还维护了一个“状态戳”
  • 修改对象值得同时,也要修改状态戳
  • AtomicStampedReference设置对象值时,对象值以及状态戳都必须满足期望值,写入才会成功
1
2
3
4
5
6
7
while (true) {
int stamp = num.getStamp();
Integer expected = num.getReference();
if (num.compareAndSet(expected, expected + 100, stamp, stamp + 1)) {
break;
}
}

Unsafe

  • Unsafe类是在sun.misc包下,不属于Java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Cassandra、Hadoop、Kafka等

  • Java中Unsafe类为我们提供类似C++手动管理内存的功能,同时也有了指针的问题

  • Unsafe类是final的,不能继承;构造函数是private的,因此无法在外部实例化

    • 只能用反射获取实例
  • 所有方法都是native的

1
2
3
4
5
6
public final class Unsafe {

private static final Unsafe theUnsafe = new Unsafe();

private Unsafe() {}
}

获取Unsafe

1
2
3
4
5
public Unsafe getUnsafe() throws IllegalAccessException {
Field unsafeField = Unsafe.class.getDeclaredFields()[0];
unsafeField.setAccessible(true);
return (Unsafe) unsafeField.get(null);
}
image-20231120203047561

CAS相关

1
2
3
4
5
public final boolean compareAndSwapObject(Object o, long offset, Object expected, Object x)

public final boolean compareAndSwapInt(Object o, long offset, int expected, int x)

public final boolean compareAndSwapLong(Object o, long offset, long expected, long x)

偏移量相关

  • 读写属性,需要用到偏移量(offset)
1
2
3
4
5
// 获取静态属性,在对象中的偏移量
public long staticFieldOffset(Field f)

// 获取非静态属性,在对象中的偏移量
public long objectFieldOffset(Field f)

类加载

1
2
3
4
5
6
7
8
// 创建一个类的实例,但是不会调用他的构造方法。如果类还没有初始化,则初始化这个类
public Object allocateInstance(Class<?> cls)

// 判断是否需要初始化一个类
public boolean shouldBeInitialized(Class<?> c)

// 确保一个类已经初始化了
public void ensureClassInitialized(Class<?> c)
  • defineClass方法:定义一个类,用于动态地创建类。(JDK11被移除)
  • defineAnonymousClass方法:用于动态的创建一个匿名内部类。(JDK11被移除)

普通读写

1
2
3
public int getInt(Object o, long offset)

public void putInt(Object o, long offset, int x)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class UnsafeTest {

public static void main(String[] args) throws IllegalAccessException, NoSuchFieldException {
UnsafeTest unsafeTest = new UnsafeTest();
Unsafe unsafe = unsafeTest.getUnsafe();
Guard guard = new Guard();

Field field = guard.getClass().getDeclaredField("value");
unsafe.putInt(guard, unsafe.objectFieldOffset(field), 100); // 需要获取偏移量

System.out.println("guard.getValue() = " + guard.getValue());
}

public Unsafe getUnsafe() throws IllegalAccessException {
Field unsafeField = Unsafe.class.getDeclaredFields()[0];
unsafeField.setAccessible(true);
return (Unsafe) unsafeField.get(null);
}
}

class Guard {
private int value = 1;

public int getValue() {
return value;
}
}

内存屏障

  • 主要为了避免代码重排序
    • 比如:使用loadFence(),则屏障前的load操作不能被重排序到屏障后,屏障后的load操作不能被重排序到屏障前
1
2
3
4
5
public void loadFence()		// 保证在这个屏障之前,所有读操作都已经完成

public void storeFence() // 保证在这个屏障之前,所有写操作都已经完成

public void fullFence() // 保证在这个屏障之前,所有读写操作都已经完成

线程调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 恢复阻塞线程
public void unpark(Object thread)

// 阻塞线程
public void park(boolean isAbsolute, long time)

// 获得对象锁(可重入锁)
@Deprecated
public native void monitorEnter(Object o)

// 释放对象锁
@Deprecated
public native void monitorExit(Object o)

// 尝试获取对象锁
@Deprecated
public native boolean tryMonitorEnter(Object o)
  • park方法,unpark方法,看过LockSupport类的都不会陌生,这两个方法主要用来挂起和唤醒线程
    • LockSupport中的parkunpark方法正是通过Unsafe来实现的:
1
2
3
4
5
6
7
8
9
10
11
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}

public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
  • monitorEnter方法和monitorExit方法用于加锁,Java中的synchronized锁就是通过这两个指令来实现的
    • JDK11之后,这几个方法被移除

final

原理

1
2
3
public class Main6 {
final int a = 20;
}

字节码如下:

1
2
3
4
5
6
7
 0 aload_0
1 invokespecial #1 <java/lang/Object.<init> : ()V>
4 aload_0
5 bipush 20 # 将值直接放入栈中
7 putfield #2 # Field a:I
# 此处有个写屏障
10 return

final变量的赋值通过putfield指令来完成,这条指令后会加入写屏障,避免其他线程读到他的值不会出现为零值的问题

其他线程访问final修饰的变量:

  • 赋值一份放入栈中,直接访问
  • 大于short的最大值,会将其复制到类的常量池,访问时从常量池取

不可变

  • 定义:一个对象不能够修改其内部状态(属性),比如String类

  • 不可变对象是线程安全的

  • 更改String对象,会构建新的字符串对象(这种创建副本来避免共享的方式称为:保护性拷贝)

1
2
3
4
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
private final char value[];
}

无状态

状态信息:成员变量保存的数据

无状态:没有成员变量

如果一个类没有任何成员变量,那他就是线程安全的

ThreadLocal

基本介绍

  • 用来提供线程内部的局部变量,分配在堆内的TLAB中

  • ThreadLocal实例属于一个线程的本地变量,每个线程都有,所以是线程安全的

基本使用

方法描述
ThreadLocal<>()创建ThreadLocal对象
protected T initialValue()返回:当前线局部变量,的初始值
public void set( T value)设置:当前线程绑定,的局部变量
public T get()获取:当前线程绑定,的局部变量
public void remove()移除:当前线程绑定,的局部变量
1
2
3
4
5
6
7
8
9
10
11
12
13
class MyDemo {
private static ThreadLocal<String> t1 = new ThreadLocal<>();

private String content;

public String getContent() {
return t1.get();
}

public void setContent(String content) {
t1.set(content);
}
}
1
2
3
4
5
6
7
8
for (int i = 0; i < 5; i++) {
String finalI = "" + i;
new Thread(() -> {
// 每个线程都有自己的独立的数据
myDemo.setContent(finalI);
String content = myDemo.getContent();
}).start();
}

实现原理

底层结构

JDK8以前:

  • 每个ThreadLocal都创建一个Map,线程作为key,存储的值作为value

  • 缺点:Map结果过大、内存泄露(Thread停止后,key对应的数据无法删除)

JDK8以后:

  • 每个Thread维护一个ThreadLocalMap,ThreadLocal实例作为key,存储的值作为value

  • 优点:

    • Map中的Entry数量变少(以前由Thread数决定,现在由ThreadLocal数量决定)
    • 防止内存泄露(Thread销毁,ThreadLocalMap也会销毁)

成员变量

  • 每个Thread线程持有一个ThreadLocalMap对象
1
2
3
4
public class Thread implements Runnable {

ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}
  • ThreadLocal中:
1
2
3
4
5
6
7
8
public class ThreadLocal<T> {

private final int threadLocalHashCode = nextHashCode(); // 计算ThreadLocal哈希值

private static AtomicInteger nextHashCode = new AtomicInteger();// 每新增ThreadLocal对象,分配新hash

private static final int HASH_INCREMENT = 0x61c88647; // 新增hash量(黄金分割数)
}

成员方法

  • 返回线程局部变量的初始值(此方法缺省,为了让子类覆盖而设计的)
1
2
3
protected T initialValue() {
return null;
}
  • 计算哈希值(每次以斐波纳契数/黄金分割数递增),好处是分布均匀
1
2
3
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
  • 修改当前线程ThreadLocal的局部变量
1
2
3
4
5
6
7
8
9
10
11
12
13
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}

ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
  • 获取当前线程ThreadLocal的局部变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// map==null,或者没有与当前ThreadLocal关联的entry,就会设置默认值
return setInitialValue();
}

// 设置默认值(null)
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
if (this instanceof TerminatingThreadLocal) {
TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
}
return value;
}

protected T initialValue() {
return null;
}
  • 移除当前线程ThreadLocal的局部变量
1
2
3
4
5
6
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null) {
m.remove(this);
}
}

ThreadLocalMap

成员属性

是ThreadLocal内部类,用独立的方式实现了Map的功能,内部Entry也是独立实现

1
2
3
4
5
6
7
8
9
10
11
// 散列表数组的初始长度16
private static final int INITIAL_CAPACITY = 16;

// 存放数据的table,长度必须是2的幂。
private Entry[] table;

// 数组里面entrys的个数,可以用于判断table当前使用量是否超过阈值
private int size = 0;

// 进行扩容的阈值,表使用量大于它的时候进行扩容。
private int threshold;

存储结构Entry:

  • 继承WeakReference,key是弱引用,以便将ThreadLocal生命周期与线程生命周期解绑
1
2
3
4
5
6
7
8
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
// this.referent = referent = key;
super(k);
value = v;
}
}

构造方法(是延迟初始化的,Thread第一次存储firstKey-firstValue时才会创建ThreadLocalMap

1
2
3
4
5
6
7
8
9
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
// 【寻址算法】计算索引
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
// 将阈值设置为 (当前数组长度 * 2)/ 3。
setThreshold(INITIAL_CAPACITY);
}

成员方法

添加数据:

  • 一直探测下一个地址,直到有空的地址后插入
  • 若插入后Map数量超过阈值,数组扩容为原来两倍
  • 探测过程中,key为null的脏Entry会被垃圾清理,并复用位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void set(ThreadLocal<?> key, Object value) {

Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1); // 哈希寻址

for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { // 一直找到entry为null
ThreadLocal<?> k = e.get();

if (k == key) { // ThreadLocal对应的key存在,直接覆盖即可
e.value = value;
return;
}

if (k == null) { // key为null,值不为null,说明之前ThreadLocal对象已被回收了
replaceStaleEntry(key, value, i); // 垃圾清理,槽位复用
return;
}
}

// 逻辑到这里,说明碰到槽位为null的情况,创建新的Entry
tab[i] = new Entry(key, value);
int sz = ++size;

// 启发式清理一下,还是超过阈值,则进行扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
1
2
3
4
5
6
7
8
9
private static int nextIndex(int i, int len) {
// 环形数组,不断访问下一个
return ((i + 1 < len) ? i + 1 : 0);
}

private static int prevIndex(int i, int len) {
// 环形数组,不断访问上一个
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// stateSlot是最新的过期数据的位置
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;

int slotToExpunge = staleSlot;

// 在不为空的Entry中向前找,找到第一个过期数据的位置(key,也即弱引用,为null)
for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;

// 向后找,找到第一个值为null的位置为止
for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();

// 找到key相等的条件【替换逻辑】
if (k == key) {
e.value = value;

// 本来要在staleSlot处放置数据,现在发现i处key与数据一致。
// 把数据放在stateSlot处。原本在stateSlot处的过期数据放到i这里来
tab[i] = tab[staleSlot];
tab[staleSlot] = e;

// 之前往前没找到过期数据,第一条过期数据是位置staleSlot
if (slotToExpunge == staleSlot)
// 不过现在第一条过期数据到位置i了
slotToExpunge = i;

// 从slotToExpunge往后清理垃圾
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}

// 当前位置i也是过期数据,且之前向前找没找到过期数据,i就是第一条过期数据
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);

// 从slotToExpunge往后清理垃圾
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

image-20231212132900946

获取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// get函数调用它
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
// 线性探测
return getEntryAfterMiss(key, i, e);
}

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;


while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key) // 找到第一个满足条件的
return e;
if (k == null) // 过期数据回收
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null; // 没有找到数据,返回null(数据一定是紧挨着的,不可能隔一个null放到后面)
}

rehash:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void rehash() {
expungeStaleEntries(); // 遍历所有槽位清理过期数据

if (size >= threshold - threshold / 4) // 还是超过阈值,扩容
resize();
}

private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) { // 遍历所有槽位清理过期数据
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j); // 探测式清理
}
}

remove:

1
2
3
4
5
6
7
8
9
10
11
12
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) { // 找到了对应的key
e.clear(); // 设置key为null
expungeStaleEntry(i); // 探测式清理
return;
}
}
}

探测式清理方法

1
private int expungeStaleEntry(int staleSlot) {}
image-20231212134042892 image-20231212134055906

启发式清理

  • 向后循环扫描,发现过期数据就用探测式清理方法,来清理
  • 加入当前数组长度为16,16->8->4->2->1,会向后扫描四次
  • 假如扫到了垃圾,就又恢复到16,重新开始16->8->4…
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}

InheritableThreadLocal

基本使用

父子线程:创建子线程的就是父线程,比如实例中的main线程就是父线程

想实现线程间局部变量传递,可以使用InheritableThreadLocal

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>();

threadLocal.set("hello"); // 父线程设置值

new Thread(() -> {
String s = threadLocal.get(); // 子线程也能拿到
}).start();
}

实现原理

InheritableThreadLocal源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class InheritableThreadLocal<T> extends ThreadLocal<T> {

protected T childValue(T parentValue) {
return parentValue;
}

ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}

void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}

在Thread的构造方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 构造函数调用了init方法
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {

Thread parent = currentThread();

// 父线程的inheritableThreadLocals属性不为null,复制父线程的inheritableThreadLocals
if (inheritThreadLocals && parent.inheritableThreadLocals != null) {
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
}
}

// 本质上还是创建ThreadLocalMap
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}

// 逐个复制父线程ThreadLocalMap中的数据
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];

for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}
  • 标题: JUC系列:(五)无锁
  • 作者: 布鸽不鸽
  • 创建于 : 2024-01-10 14:54:58
  • 更新于 : 2024-01-10 14:56:29
  • 链接: https://xuedongyun.cn//post/62388/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论