线程与并发编程
# 15.线程与并发编程
# 目录介绍
- 01.为什么Android需要多线程
- 1.1 单线程的困境
- 1.2 Android中的线程分类
- 02.Android线程模型与UI线程
- 2.1 为什么UI操作必须在主线程
- 2.2 主线程的消息循环
- 03.Thread与Runnable底层原理
- 3.1 Java Thread与Linux线程的关系
- 3.2 线程的生命周期
- 3.3 线程创建的开销
- 04.HandlerThread原理与应用
- 4.1 HandlerThread的设计
- 4.2 HandlerThread的使用场景
- 05.IntentService的设计原理
- 5.1 IntentService的实现
- 5.2 IntentService的设计优势
- 06.AsyncTask的设计与缺陷
- 6.1 AsyncTask的内部原理
- 6.2 AsyncTask的缺陷
- 07.线程池ThreadPoolExecutor原理
- 7.1 ThreadPoolExecutor核心参数
- 7.2 任务提交的处理流程
- 7.3 Worker的工作原理
- 08.Android中的线程池最佳实践
- 8.1 常见的线程池配置
- 8.2 全局线程池管理
- 8.3 线程池的监控
- 09.synchronized与对象锁原理
- 9.1 synchronized在ART中的实现
- 9.2 锁的升级过程
- 9.3 wait/notify的实现
- 10.volatile与内存可见性
- 10.1 为什么需要volatile
- 10.2 volatile的内存语义
- 11.CAS与原子操作
- 11.1 CAS的原理
- 11.2 AtomicInteger的实现
- 11.3 ABA问题
- 12.并发容器原理
- 12.1 ConcurrentHashMap
- 12.2 CopyOnWriteArrayList
- 12.3 BlockingQueue
- 13.Kotlin协程原理
- 13.1 协程的本质
- 13.2 协程的挂起与恢复
- 13.3 协程调度器
- 14.协程与线程的关系
- 14.1 协程运行在线程上
- 14.2 协程vs线程的开销对比
- 14.3 结构化并发
- 15.RxJava线程调度原理
- 15.1 RxJava的线程调度
- 15.2 subscribeOn的实现原理
- 15.3 observeOn的实现原理
- 15.4 AndroidSchedulers.mainThread()
- 16.总结与技术思考
- 16.1 核心要点回顾
- 16.2 方案选型建议
- 16.3 面试高频问题
# 01.为什么Android需要多线程
# 1.1 单线程的困境
Android的UI操作必须在主线程(UI线程)中执行。如果在主线程中执行耗时操作,会导致界面卡顿甚至ANR:
主线程执行耗时操作的后果:
用户点击按钮
│
├── 主线程开始执行网络请求(3秒)
│ │
│ │ ← 这3秒内:
│ │ - UI无法响应触摸事件
│ │ - 动画停止
│ │ - 用户看到界面"冻住"了
│ │
│ └── 如果超过5秒 → InputDispatcher触发ANR
│
└── 网络请求完成,更新UI
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
所以Android需要多线程:耗时操作放在工作线程,UI更新在主线程。但这引出了线程间通信的问题,Handler机制就是为此设计的。
# 1.2 Android中的线程分类
Android系统中的线程类型:
主线程(UI线程):
└── ActivityThread.main()启动的线程
└── 处理UI绑定、触摸事件、生命周期回调
└── Looper.getMainLooper()获取其Looper
Binder线程:
└── 处理跨进程Binder调用
└── 默认最大16个
└── /proc/pid/task/中可以看到
渲染线程(RenderThread):
└── Android 5.0+引入
└── 执行硬件加速的渲染命令
└── 与主线程并行工作
工作线程:
└── 开发者创建的后台线程
└── AsyncTask、HandlerThread、线程池等
└── 执行耗时操作
GC线程:
└── ART的垃圾回收线程
└── Concurrent Copying GC的并发标记线程
JIT编译线程:
└── ART的即时编译线程
└── 在后台将热点方法编译为机器码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 02.Android线程模型与UI线程
# 2.1 为什么UI操作必须在主线程
疑惑:为什么不能在工作线程更新UI?多线程操作UI不是更高效吗?
答疑:Android选择单线程UI模型是有深刻原因的。
如果允许多线程操作UI,需要对所有View操作加锁。一个界面可能有几十个View,每个View有几十个属性。如果每次操作都加锁,性能开销极大。更重要的是,View之间有复杂的依赖关系(父子、兄弟),要实现无死锁的细粒度锁几乎不可能。
所以Android选择了单线程模型:所有UI操作在主线程串行执行,不需要任何锁。
// ViewRootImpl.java - UI线程检查
void checkThread() {
if (mThread != Thread.currentThread()) {
throw new CalledFromWrongThreadException(
"Only the original thread that created a view hierarchy " +
"can touch its views.");
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 2.2 主线程的消息循环
// ActivityThread.java
public static void main(String[] args) {
// 1.创建主线程的Looper
Looper.prepareMainLooper();
// 2.创建ActivityThread
ActivityThread thread = new ActivityThread();
thread.attach(false, startSeq);
// 3.获取Handler
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler(); // mH
}
// 4.进入消息循环(永不返回)
Looper.loop();
// 如果执行到这里,说明loop退出了
throw new RuntimeException("Main thread loop unexpectedly exited");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
主线程的所有工作都是通过Handler消息驱动的:
主线程的消息驱动模型:
MessageQueue中的消息类型:
├── LAUNCH_ACTIVITY → 启动Activity
├── PAUSE_ACTIVITY → 暂停Activity
├── BIND_SERVICE → 绑定Service
├── RECEIVER → 接收广播
├── VIEW_INVALIDATE → 重绘View
├── INPUT_EVENT → 处理输入事件
├── CHOREOGRAPHER_CALLBACK → VSync帧回调
└── ... 几乎所有主线程行为都是消息驱动的
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 03.Thread与Runnable底层原理
# 3.1 Java Thread与Linux线程的关系
Android中的Java Thread底层是Linux的pthread:
Java Thread创建到Linux线程的映射:
new Thread().start()
│
└── Thread.start() (Java层)
└── nativeCreate() (JNI调用)
└── Thread::CreateNativeThread() (ART)
└── pthread_create() (Bionic libc)
└── clone() (Linux系统调用)
└── 内核创建新的task_struct
└── 新线程开始执行
线程的本质:
Java Thread对象 ← 1:1 → pthread ← 1:1 → Linux内核线程
Java Thread对象存储在Java堆中
pthread_t结构存储在Native堆中
task_struct存储在内核空间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 3.2 线程的生命周期
// Thread的状态(Java层)
public enum State {
NEW, // 创建但未start
RUNNABLE, // 正在运行或等待CPU调度
BLOCKED, // 等待监视器锁(synchronized)
WAITING, // 无限期等待(wait/join/park)
TIMED_WAITING, // 有超时的等待(sleep/wait(timeout))
TERMINATED // 执行完毕
}
// 状态转换
NEW ──start()──► RUNNABLE ──run()结束──► TERMINATED
│ ↑
│ │ 获得锁
synchronized │ │
▼ │
BLOCKED
RUNNABLE ──wait()──► WAITING ──notify()──► RUNNABLE
RUNNABLE ──sleep()──► TIMED_WAITING ──超时──► RUNNABLE
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 3.3 线程创建的开销
创建一个线程的开销:
1. 内存开销
├── Java Thread对象:~100字节
├── 线程栈:默认1MB(可通过构造函数设置)
└── Native结构:pthread ~1KB
2. 时间开销
└── 创建线程:约0.5-1ms
└── 销毁线程:约0.3ms
└── 相比之下,线程池复用线程:~0.01ms
3. 系统资源
└── 每个线程需要一个内核task_struct
└── 线程切换需要保存/恢复寄存器上下文
└── 线程过多会增加调度开销
结论:
不要频繁创建和销毁线程
使用线程池来复用线程
控制线程总数(通常CPU核心数*2左右)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 04.HandlerThread原理与应用
# 4.1 HandlerThread的设计
HandlerThread是一个自带Looper的线程,使得工作线程也能接收和处理消息:
// HandlerThread.java
public class HandlerThread extends Thread {
Looper mLooper;
Handler mHandler;
@Override
public void run() {
mTid = Process.myTid();
// 创建Looper
Looper.prepare();
synchronized (this) {
mLooper = Looper.myLooper();
notifyAll(); // 通知getLooper()的等待者
}
// 设置线程优先级
Process.setThreadPriority(mPriority);
onLooperPrepared(); // 可以重写,在Looper运行前做初始化
// 进入消息循环
Looper.loop(); // 阻塞,直到quit()被调用
mTid = -1;
}
// 获取Looper(如果线程还没准备好,会阻塞等待)
public Looper getLooper() {
if (!isAlive()) return null;
synchronized (this) {
while (isAlive() && mLooper == null) {
try {
wait(); // 等待Looper创建完成
} catch (InterruptedException e) {
}
}
}
return mLooper;
}
// 退出消息循环
public boolean quit() {
Looper looper = getLooper();
if (looper != null) {
looper.quit(); // 移除所有消息,退出loop
return true;
}
return false;
}
// 安全退出(处理完已有消息后退出)
public boolean quitSafely() {
Looper looper = getLooper();
if (looper != null) {
looper.quitSafely();
return true;
}
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 4.2 HandlerThread的使用场景
HandlerThread的典型应用:
1. 串行化后台任务
└── 所有任务通过Handler发送到同一个线程
└── 天然保证执行顺序和线程安全
2. IntentService的底层实现
└── 使用HandlerThread处理onHandleIntent
3. 轮询任务
└── 通过postDelayed实现定时执行
4. 数据库操作线程
└── 所有数据库操作在同一个HandlerThread中
└── 避免并发问题
注意事项:
└── 使用完后必须调用quit()或quitSafely()
└── 否则线程和Looper会一直存在(内存泄漏)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 05.IntentService的设计原理
# 5.1 IntentService的实现
IntentService基于HandlerThread实现,一个串行处理Intent的Service:
// IntentService.java(已被JobIntentService/WorkManager替代)
public abstract class IntentService extends Service {
private volatile Looper mServiceLooper;
private volatile ServiceHandler mServiceHandler;
// 内部Handler
private final class ServiceHandler extends Handler {
public ServiceHandler(Looper looper) {
super(looper);
}
@Override
public void handleMessage(Message msg) {
// 在HandlerThread中执行
onHandleIntent((Intent) msg.obj);
// 处理完自动停止Service
stopSelf(msg.arg1);
}
}
@Override
public void onCreate() {
super.onCreate();
// 创建HandlerThread
HandlerThread thread = new HandlerThread("IntentService[" + mName + "]");
thread.start();
// 获取Looper和创建Handler
mServiceLooper = thread.getLooper();
mServiceHandler = new ServiceHandler(mServiceLooper);
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
// 将Intent封装为Message发送到HandlerThread
Message msg = mServiceHandler.obtainMessage();
msg.arg1 = startId;
msg.obj = intent;
mServiceHandler.sendMessage(msg);
return mRedelivery ? START_REDELIVER_INTENT : START_NOT_STICKY;
}
@Override
public void onDestroy() {
mServiceLooper.quit();
}
// 子类实现,在工作线程中处理Intent
protected abstract void onHandleIntent(Intent intent);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 5.2 IntentService的设计优势
IntentService的设计思想:
问题:如何在Service中安全地执行耗时操作?
方案1:直接在onStartCommand中开线程
缺点:多次startService可能创建多个线程
缺点:Service不知道什么时候该停止
方案2:IntentService
├── 内部只有一个工作线程(HandlerThread)
├── 多个Intent串行处理(不会并发)
├── 处理完自动停止Service(stopSelf(startId))
└── 不需要手动管理线程生命周期
注意:Android 8.0后台Service限制
IntentService作为后台Service受到限制
推荐使用WorkManager替代
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 06.AsyncTask的设计与缺陷
# 6.1 AsyncTask的内部原理
// AsyncTask.java(已废弃,但理解其设计有价值)
public abstract class AsyncTask<Params, Progress, Result> {
// 串行执行器
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
// 线程池
public static final Executor THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(
CORE_POOL_SIZE, // 核心线程数 = CPU核心数+1
MAXIMUM_POOL_SIZE, // 最大线程数 = CPU核心数*2+1
KEEP_ALIVE_SECONDS, // 空闲超时30秒
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(128));
// 内部Handler(用于切换到主线程)
private static final InternalHandler sHandler = new InternalHandler(
Looper.getMainLooper());
// 串行执行器实现
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<>();
Runnable mActive;
public synchronized void execute(Runnable r) {
// 将任务加入队列
mTasks.offer(() -> {
try {
r.run();
} finally {
// 当前任务完成后,调度下一个
scheduleNext();
}
});
// 如果没有活跃任务,开始调度
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 6.2 AsyncTask的缺陷
AsyncTask被废弃的原因:
1. 内存泄漏
└── AsyncTask作为Activity的内部类
└── 持有Activity的引用
└── Activity销毁后AsyncTask仍在运行 → 泄漏
2. 生命周期问题
└── Activity重建(旋转屏幕)后
└── 旧的AsyncTask持有旧Activity引用
└── 在旧Activity上更新UI → 崩溃或无效
3. 串行执行(Android 3.0+默认)
└── 全局共享一个SerialExecutor
└── 一个AsyncTask阻塞会影响所有其他AsyncTask
4. API设计问题
└── cancel()不能真正取消已开始的任务
└── 只是设置isCancelled标志
└── doInBackground中需要手动检查
替代方案:
├── Kotlin协程(推荐)
├── RxJava
├── Executor + Handler
└── WorkManager(持久化任务)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 07.线程池ThreadPoolExecutor原理
# 7.1 ThreadPoolExecutor核心参数
// ThreadPoolExecutor构造函数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 非核心线程空闲超时时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 7.2 任务提交的处理流程
ThreadPoolExecutor任务调度流程:
提交新任务(execute(task))
│
├── 1.当前线程数 < corePoolSize?
│ ├── 是 → 创建核心线程执行任务
│ └── 否 ↓
│
├── 2.workQueue未满?
│ ├── 是 → 将任务放入队列等待
│ └── 否 ↓
│
├── 3.当前线程数 < maximumPoolSize?
│ ├── 是 → 创建非核心线程执行任务
│ └── 否 ↓
│
└── 4.执行拒绝策略(RejectedExecutionHandler)
├── AbortPolicy → 抛出RejectedExecutionException
├── CallerRunsPolicy → 在调用者线程执行
├── DiscardPolicy → 静默丢弃
└── DiscardOldestPolicy → 丢弃队列最老任务
图示:
核心线程已满?
│
否 ──────┤────── 是
│ │
创建核心线程 队列已满?
执行任务 │
否 ──────┤────── 是
│ │
入队等待 最大线程已满?
│
否 ──────┤────── 是
│ │
创建非核心线程 拒绝策略
执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 7.3 Worker的工作原理
// ThreadPoolExecutor.Worker
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
// Worker的执行循环
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
w.lock();
try {
// 执行前回调
beforeExecute(wt, task);
// 执行任务
task.run();
// 执行后回调
afterExecute(task, null);
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// getTask()返回null,线程退出
processWorkerExit(w, completedAbruptly);
}
// 从队列获取任务
private Runnable getTask() {
for (;;) {
int wc = workerCountOf(ctl.get());
// 是否需要超时回收
boolean timed = wc > corePoolSize;
// 从队列获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时等待
workQueue.take(); // 无限等待(核心线程)
if (r != null) return r;
// 超时未获得任务,非核心线程可以回收
if (timed && timedOut) {
return null; // 返回null使Worker退出循环
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 08.Android中的线程池最佳实践
# 8.1 常见的线程池配置
// Android推荐的线程池配置
// 1.CPU密集型任务(计算、编解码)
ExecutorService cpuExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), // 核心=CPU核心数
Runtime.getRuntime().availableProcessors(), // 最大=CPU核心数
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(128));
// 2.IO密集型任务(网络、文件IO)
ExecutorService ioExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2, // 核心=CPU核心数*2
Runtime.getRuntime().availableProcessors() * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(256));
// 3.串行任务(数据库操作等)
ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
// 4.定时任务
ScheduledExecutorService scheduledExecutor =
Executors.newScheduledThreadPool(2);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 8.2 全局线程池管理
// 推荐:统一的线程池管理
object AppExecutors {
// IO线程池
val io: ExecutorService = ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
LinkedBlockingQueue(128),
ThreadFactory { r -> Thread(r, "app-io-${threadId.getAndIncrement()}") }
)
// 计算线程池
val compute: ExecutorService = ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
0, TimeUnit.MILLISECONDS,
LinkedBlockingQueue(64),
ThreadFactory { r -> Thread(r, "app-compute-${threadId.getAndIncrement()}") }
)
// 主线程
val mainThread: Executor = Executor { command ->
Handler(Looper.getMainLooper()).post(command)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 8.3 线程池的监控
线程池问题排查:
常见问题:
├── 线程泄漏:线程创建后不回收
│ └── 监控:观察Thread数量是否持续增长
│
├── 任务堆积:队列满导致拒绝
│ └── 监控:workQueue.size()
│
├── 线程饥饿:核心线程全部阻塞
│ └── 监控:getActiveCount() == getPoolSize()
│
└── OOM:线程过多耗尽内存
└── 每个线程默认1MB栈空间
└── 100个线程 = 100MB内存
调试命令:
adb shell kill -3 <pid> # 生成线程dump
adb shell cat /proc/<pid>/status | grep Threads # 查看线程数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 09.synchronized与对象锁原理
# 9.1 synchronized在ART中的实现
ART中synchronized的锁实现:
对象头中的锁状态(monitor_字段):
┌──────────────────────────────────┐
│ monitor_ (32位) │
├──────────────────────────────────┤
│ [31-30] LockWord状态: │
│ 00 = 未锁定(Unlocked) │
│ 01 = 轻量级锁(ThinLock) │
│ 10 = 胖锁(FatLock/Monitor) │
│ 11 = 哈希状态(HashState) │
├──────────────────────────────────┤
│ 轻量级锁: [29-16]锁持有者线程ID │
│ [15-0] 锁重入计数 │
├──────────────────────────────────┤
│ 胖锁: [29-0] Monitor对象指针 │
└──────────────────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 9.2 锁的升级过程
ART中锁的升级(锁膨胀)过程:
无锁 → 轻量级锁 → 胖锁
1. 无锁状态
monitor_ = 0
没有线程持有锁
2. 轻量级锁(ThinLock)
线程A尝试加锁:
└── CAS设置monitor_中的线程ID
└── 成功 → 获得轻量级锁
└── 重入 → 增加计数(最多4096次)
线程B尝试加锁(竞争):
└── CAS失败(线程A持有锁)
└── 自旋等待(短时间)
└── 自旋失败 → 升级为胖锁
3. 胖锁(FatLock/Monitor)
└── 创建Monitor对象
└── 包含:
│ ├── owner线程
│ ├── 等待队列(wait set)
│ ├── 阻塞队列(entry set)
│ └── 递归计数
└── 竞争线程进入阻塞状态
└── 底层使用futex系统调用
锁升级是不可逆的:一旦膨胀为胖锁,不会降级回轻量级锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 9.3 wait/notify的实现
wait/notify的底层机制:
synchronized(obj) {
obj.wait(); // 释放锁,进入等待
}
// ART中的实现
Monitor::Wait(Thread* self, int64_t ms) {
// 1.释放Monitor锁
owner_ = nullptr;
lock_count = 0;
// 2.将当前线程加入wait set
AppendToWaitSet(self);
// 3.通知其他等待锁的线程
// 有线程可以获得锁了
// 4.当前线程进入等待状态
if (ms == 0) {
futex_wait(...); // 无限等待
} else {
futex_wait_timeout(ms); // 超时等待
}
// 5.被notify唤醒后,重新竞争锁
Lock(self);
// 6.恢复锁重入计数
lock_count = saved_lock_count;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 10.volatile与内存可见性
# 10.1 为什么需要volatile
没有volatile时的可见性问题:
CPU核心0 CPU核心1
┌──────────┐ ┌──────────┐
│ 寄存器 │ │ 寄存器 │
│ flag=true │ │ flag=??? │
├──────────┤ ├──────────┤
│ L1 Cache │ │ L1 Cache │
│ flag=true │ │ flag=false│ ← 还是旧值!
├──────────┤ ├──────────┤
│ L2 Cache │ │ L2 Cache │
│ flag=??? │ │ flag=false│
└──────────┘ └──────────┘
\ /
\ /
┌───────────────────┐
│ 主内存 (RAM) │
│ flag = true │ ← 核心0已写入
└───────────────────┘
问题:核心0写入flag=true后
核心1可能还从自己的L1 Cache读到flag=false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 10.2 volatile的内存语义
// volatile保证两件事
volatile boolean flag = false;
// 1.可见性:写入后立即对其他线程可见
// 写volatile:刷新CPU缓存到主内存
// 读volatile:从主内存读取最新值
// 2.有序性:禁止指令重排序
// volatile写之前的操作不会被重排到volatile写之后
// volatile读之后的操作不会被重排到volatile读之前
// 底层实现:内存屏障
// ARM架构上:
// volatile写 → DMB (Data Memory Barrier) 指令
// volatile读 → DMB 指令
// 注意:volatile不保证原子性
volatile int count = 0;
count++; // 不是原子操作!
// 分解为:读count → +1 → 写count
// 多线程同时执行可能丢失更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 11.CAS与原子操作
# 11.1 CAS的原理
CAS (Compare And Swap) 操作:
原理:
CAS(内存地址, 期望值, 新值)
if (内存地址的当前值 == 期望值) {
设置为新值
return true
} else {
return false // 被其他线程修改了
}
硬件保证原子性:
ARM架构: LDREX/STREX (Load/Store Exclusive)
x86架构: LOCK CMPXCHG
Java中的CAS:
AtomicInteger.compareAndSet(expect, update)
└── Unsafe.compareAndSwapInt()
└── CPU原子指令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 11.2 AtomicInteger的实现
// AtomicInteger的核心实现
public class AtomicInteger extends Number {
private volatile int value; // volatile保证可见性
// CAS操作
public final boolean compareAndSet(int expect, int update) {
return U.compareAndSetInt(this, VALUE, expect, update);
}
// 原子自增
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
// Unsafe.getAndAddInt的实现
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset); // 读取当前值
} while (!compareAndSetInt(o, offset, v, v + delta)); // CAS循环
return v;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 11.3 ABA问题
CAS的ABA问题:
线程1:读取值A,准备更新为B
线程2:将A改为B,再改回A
线程1:CAS检查,值仍为A → 成功(但中间已经变化过了!)
解决方案:AtomicStampedReference
每次修改时同时更新版本号
CAS时不仅比较值,还比较版本号
AtomicStampedReference<Integer> ref =
new AtomicStampedReference<>(100, 0);
int[] stampHolder = new int[1];
int value = ref.get(stampHolder); // value=100, stamp=0
ref.compareAndSet(100, 200, 0, 1); // 值和stamp都匹配才更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 12.并发容器原理
# 12.1 ConcurrentHashMap
ConcurrentHashMap的演进(Java 8+):
Java 7之前:分段锁
├── 将数据分成多个Segment
├── 每个Segment独立加锁
└── 最多Segment数(默认16)个线程并发
Java 8+:CAS + synchronized
├── 数组 + 链表 + 红黑树
├── 对数组每个桶单独加锁
├── 空桶:CAS插入
├── 非空桶:synchronized锁住头节点
└── 链表长度>8 → 转为红黑树
为什么比Hashtable快:
Hashtable: 整个表一把锁 → 同时只有1个线程操作
ConcurrentHashMap: 每个桶一把锁 → 不同桶可以并行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 12.2 CopyOnWriteArrayList
CopyOnWriteArrayList的原理:
读操作:不加锁,直接读取内部数组
写操作:
1. 加锁(ReentrantLock)
2. 复制一份新数组
3. 在新数组上修改
4. 将引用指向新数组
5. 解锁
优点:读操作完全无锁,读多写少场景性能好
缺点:写操作需要复制数组,写多场景不适合
适用:事件监听器列表、配置列表等读多写少的场景
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 12.3 BlockingQueue
BlockingQueue(线程池的核心组件):
ArrayBlockingQueue:
├── 有界队列,固定大小
├── 数组实现
└── 一把锁(ReentrantLock) + 两个条件(notEmpty/notFull)
LinkedBlockingQueue:
├── 有界/无界队列
├── 链表实现
└── 两把锁(put锁/take锁) → 写入和取出可以并发
SynchronousQueue:
├── 不存储元素的队列
├── 每个put必须等待一个take
└── 适用于直接传递任务(CachedThreadPool使用)
PriorityBlockingQueue:
├── 无界优先级队列
├── 堆实现
└── 按优先级取出元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 13.Kotlin协程原理
# 13.1 协程的本质
疑惑:协程和线程有什么区别?协程是不是比线程更轻量?
答疑:协程的本质是编译器生成的状态机,它可以在不阻塞线程的情况下挂起和恢复执行。
协程的本质:
// Kotlin代码
suspend fun fetchUser(): User {
val token = login() // 挂起点1
val user = getUser(token) // 挂起点2
return user
}
// 编译器转换后的伪代码(状态机)
class FetchUserStateMachine(completion: Continuation<User>) : ContinuationImpl {
var label = 0 // 当前状态
var token: String? // 中间结果
fun invokeSuspend(result: Any?) {
when (label) {
0 -> {
label = 1 // 设置下一个状态
val suspended = login(this) // 传入自己作为回调
if (suspended == COROUTINE_SUSPENDED) return // 挂起
}
1 -> {
token = result as String // 恢复,获取login结果
label = 2
val suspended = getUser(token, this)
if (suspended == COROUTINE_SUSPENDED) return
}
2 -> {
val user = result as User // 恢复,获取getUser结果
completion.resumeWith(Result.success(user))
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 13.2 协程的挂起与恢复
协程挂起与恢复的过程:
线程A (Main) 线程B (IO)
│
├── 启动协程
│ label=0
│ 调用login(continuation)
│ login是suspend函数
│ 返回COROUTINE_SUSPENDED
│ │
│ └── 协程被挂起
│ 线程A继续处理其他消息
│ (不阻塞!) ├── 执行网络请求
│ ├── 请求完成
│ └── continuation.resume(token)
│ │
│ ◄────────── 切换到线程A ──────────┘
│ label=1
│ 获取token
│ 调用getUser(token, continuation)
│ ...
关键点:
挂起 = 保存状态机状态 + 释放线程
恢复 = 恢复状态机状态 + 在指定线程继续执行
协程挂起时不占用任何线程!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 13.3 协程调度器
// 协程调度器决定协程在哪个线程执行
Dispatchers.Main // 主线程(Android的主Looper)
Dispatchers.IO // IO线程池(适合IO操作)
Dispatchers.Default // 默认线程池(适合CPU计算)
Dispatchers.Unconfined // 不切换线程(在当前线程执行)
// Dispatchers.Main的实现
// HandlerDispatcher: 通过Handler.post将任务发送到主线程
class HandlerContext(
private val handler: Handler
) : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block) // 通过Handler切换到主线程
}
}
// Dispatchers.IO的实现
// 共享Default调度器的线程池,但允许更多并行
// 最大线程数 = max(64, CPU核心数)
// 用于IO密集型任务(网络、文件、数据库)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 14.协程与线程的关系
# 14.1 协程运行在线程上
协程与线程的关系图:
线程池(如Dispatchers.Default)
┌──────────────────────────────────┐
│ Thread-1 │ Thread-2 │ Thread-3│
│ ┌──────┐ │ ┌──────┐ │ │
│ │协程A │ │ │协程B │ │ │
│ │(运行) │ │ │(运行) │ │ (空闲) │
│ └──────┘ │ └──────┘ │ │
│ │ │ ┌──────┐│
│ │ │ │协程C ││
│ │ │ │(等待) ││
│ │ │ └──────┘│
└──────────────────────────────────┘
1个线程可以运行多个协程(分时)
1个协程在挂起后可以在不同线程恢复
协程挂起时不占用线程(线程可以运行其他协程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 14.2 协程vs线程的开销对比
协程vs线程对比:
创建开销:
线程:~0.5-1ms + 1MB栈内存
协程:~0.01ms + ~几百字节
切换开销:
线程切换:~1-10μs(内核态切换,保存/恢复寄存器)
协程切换:~0.1μs(用户态切换,只切换状态机状态)
数量限制:
线程:数百个(受内存限制,每个1MB栈)
协程:数百万个(非常轻量)
使用场景:
线程:真正的并行计算(多CPU核心)
协程:高并发IO操作(同时处理大量请求)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 14.3 结构化并发
// 协程的结构化并发
// 父协程取消 → 所有子协程都被取消
// 子协程异常 → 传播给父协程
coroutineScope {
val deferred1 = async { fetchUser() } // 子协程1
val deferred2 = async { fetchOrders() } // 子协程2
// 等待两个子协程都完成
val user = deferred1.await()
val orders = deferred2.await()
// 如果fetchUser()抛出异常:
// 1. deferred2会被取消
// 2. 异常传播到coroutineScope
// 3. 调用者收到异常
}
// SupervisorScope:子协程异常不影响其他子协程
supervisorScope {
val deferred1 = async { fetchUser() } // 失败
val deferred2 = async { fetchOrders() } // 继续执行
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 15.RxJava线程调度原理
# 15.1 RxJava的线程调度
// RxJava的线程切换
Observable.create(emitter -> {
// 在IO线程执行
String data = fetchFromNetwork();
emitter.onNext(data);
emitter.onComplete();
})
.subscribeOn(Schedulers.io()) // 上游在IO线程
.observeOn(AndroidSchedulers.mainThread()) // 下游在主线程
.subscribe(data -> {
// 在主线程更新UI
textView.setText(data);
});
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 15.2 subscribeOn的实现原理
// subscribeOn的实现
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<>(this, scheduler);
}
// ObservableSubscribeOn
class ObservableSubscribeOn<T> extends Observable<T> {
final Observable<T> source;
final Scheduler scheduler;
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 将subscribe操作调度到指定线程
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
// 在指定线程中执行上游的subscribe
source.subscribe(observer);
});
}
}
// subscribeOn只有第一次调用有效
// 因为它控制的是subscribe的执行线程
// subscribe从下游到上游传递,只要最上面一层在指定线程
// 整个订阅链都在该线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 15.3 observeOn的实现原理
// observeOn的实现
public final Observable<T> observeOn(Scheduler scheduler) {
return new ObservableObserveOn<>(this, scheduler);
}
// ObservableObserveOn
class ObservableObserveOn<T> extends Observable<T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker worker = scheduler.createWorker();
// 包装下游observer
source.subscribe(new ObserveOnObserver<>(observer, worker));
}
}
class ObserveOnObserver<T> implements Observer<T> {
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final SpscLinkedArrayQueue<T> queue; // 队列缓冲
@Override
public void onNext(T t) {
// 将数据放入队列
queue.offer(t);
// 调度到目标线程处理
worker.schedule(this::drainQueue);
}
void drainQueue() {
// 在目标线程中取出数据交给下游
T value = queue.poll();
downstream.onNext(value);
}
}
// observeOn可以多次切换线程
// 每次observeOn都在该点切换后续操作的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 15.4 AndroidSchedulers.mainThread()
// RxAndroid中主线程调度器的实现
class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler; // 主线程Handler
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
static class HandlerWorker extends Worker {
void schedule(Runnable run, long delay, TimeUnit unit) {
// 通过Handler.postDelayed将任务发到主线程
handler.postDelayed(run, unit.toMillis(delay));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 16.总结与技术思考
# 16.1 核心要点回顾
Android线程与并发编程核心要点:
1. Android是单线程UI模型
└── UI操作必须在主线程
└── 主线程通过Looper消息驱动
2. 线程创建有开销,使用线程池复用
└── ThreadPoolExecutor的任务调度逻辑
└── 根据任务类型选择合适的线程池配置
3. 锁的优化路径
└── 无锁(CAS) → 轻量级锁(ThinLock) → 胖锁(Monitor)
└── volatile保证可见性但不保证原子性
4. 协程是编译器生成的状态机
└── 挂起时不占用线程
└── 通过Dispatcher调度到不同线程
└── 结构化并发管理生命周期
5. 线程切换的底层实现
└── Handler: 通过消息队列切换
└── 协程Dispatcher: 底层仍是Handler/线程池
└── RxJava Scheduler: 类似机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 16.2 方案选型建议
并发方案选型:
简单的异步操作(一次性任务):
└── Kotlin协程 withContext(Dispatchers.IO)
定时/周期性任务:
└── ScheduledExecutorService 或 协程 delay
保证任务完成(即使应用退出):
└── WorkManager
响应式数据流:
└── Flow (Kotlin) 或 RxJava
大量并发IO:
└── 协程 (可创建大量协程)
CPU密集型计算:
└── 线程池 (Dispatchers.Default)
与View生命周期绑定:
└── lifecycleScope / viewModelScope
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 16.3 面试高频问题
问题:协程和线程的区别?
- 线程是OS级别的并发单元,有独立栈空间(~1MB),切换需要内核参与
- 协程是编译器生成的状态机,非常轻量(~几百字节),切换在用户态完成
- 协程运行在线程之上,挂起时不阻塞线程
问题:synchronized的锁升级过程?
- 无锁→轻量级锁(CAS设置线程ID)→胖锁(Monitor对象)
- 轻量级锁适合无竞争或短暂竞争场景
- 一旦升级为胖锁不会降级
问题:线程池的任务调度顺序?
- 先看核心线程是否满→满则入队→队列满则创建非核心线程→都满则拒绝
- 注意:不是先创建到最大线程数再入队
问题:observeOn和subscribeOn的区别?
- subscribeOn控制上游执行线程,多次调用只有第一次生效
- observeOn控制下游执行线程,可以多次调用切换线程
上次更新: 2026/06/10, 11:13:41