Java锁与并发控制
Lock 接口¶
ReentrantLock¶
ReentrantLock 是 Java 中提供的一种可重入独占锁。
1. 基本使用¶
- lock() :尝试获取锁,拿不到就一直等待(直到被抢占)
- tryLock():有返回值的 lock(),注意,返回返回值后可以继续执行后面的代码,所以后面的代码需要加一个 if 判断返回值是否为 true
- tryLock(long timeout, TimeUnit unit):等到了锁或时间到了就会返回,可被中断
- lockInterruptibly():一直等,可被中断
- unlock():将获取到的锁解锁
2. 公平锁¶
如果有多个线程在等待锁,公平锁会按照线程请求锁的顺序授予锁的许可,即先来先得的原则。
ReentrantLock
默认是非公平锁,如果想要改变它,在定义的时候传递一个 true 即可:
场景:现在存在 2 个线程,每一个线程在释放锁之后,再次获取一次锁,因为是公平锁的原因,因此两个线程会交替执行 1 -> 2 -> 1 -> 2
当前线程放锁后会有一个唤醒机制,唤醒队列头部的线程(需要一定时间),不允许此时其他任何线程争抢锁
3. 非公平锁¶
非公平锁最大的特点就是当一个任务获取锁的时候,如果恰好前面的线程释放锁,此时当前任务不再进行排队,直接插队执行任务。非公平锁在高并发场景下,会省略大量的唤醒线程的操作,但是极端情况下会造成等待队列中的任务一直被插队一直执行不了。
4. 可中断锁¶
可中断锁就是线程在等待锁的过程中可以响应中断信号,如果一个线程在等待锁时被中断,它会立即退出等待状态,并抛出 InterruptedException
。这种锁的主要目的是提供更好的线程控制,以避免线程在等待锁的过程中无限期地阻塞。
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "尝试获取锁." );
LOCK.lockInterruptibly();
try {
System.out.println(Thread.currentThread().getName() + "获取到了锁." );
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
LOCK.unlock();
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "等待锁的时候被中断,结束等待." );
}
}
ReentrantReadWriteLock¶
ReentrantReadWriteLock 实现了 ReadWriteLock 接口,它可以用于管理多个线程对共享资源的并发访问。与标准的互斥锁不同,读写锁将锁分为两种类型:读锁(共享锁)和写锁(排他锁)。这种分离的锁类型允许多个线程同时读取共享资源,但在进行写操作时需要排他地获取锁。
从 ReentrantReadWriteLock 中可以衍生出 ReadLock(读锁)和 WriteLock(写锁)。
- 读锁:读锁的类型是共享锁,它允许多个线程同时操作临界区。
- 写锁:写锁的类型是排他锁,它允许同时只能有一个线程占有写锁。
写锁是独占锁,写锁持有期间,不允许有其他的读锁或者写锁占有
readLock()
返回一个Lock
类型的读锁。writeLock()
返回一个Lock
类型的写锁。
1. 读锁¶
ReadLock
是ReentrantReadWriteLock
内部的一个静态类,实现了Lock
接口。
ReentrantReadWriteLock
并没有直接实现Lock
接口,而是实现了ReadWriteLock
接口。
private final static ReentrantReadWriteLock RRWL = new ReentrantReadWriteLock();
// 定义读锁
private final static ReentrantReadWriteLock.ReadLock RL = RRWL.readLock();
这里有点类似 Map.Entry
e : mp.entrySet(); 的语法 同一个 RRWL 实例的读写和写锁可以互相影响
2. 写锁¶
定义方法同理
3. 读写锁的公平性¶
非公平模式的行为(不传入 true 的默认设定)
- 读锁插队:如果队列头部的线程持有读锁,新的读线程可以直接尝试获取读锁,而不需要排队。这种设计允许读操作并发执行,从而提高吞吐量。
- 写锁不允许插队:如果队列头部的线程持有写锁,新的读线程或写线程都不能插队。写锁是独占的,必须等待当前写锁释放后才能获取锁。这种设计是为了避免写线程长时间等待,防止写饥饿问题。
这种设计在读多写少的场景下能够显著提升性能,同时避免写线程饥饿问题(防止源源不断的读锁过来读取数据,导致写锁迟迟得不到执行)
并发流程控制¶
CountDownLatch¶
1. 基本概念¶
它允许一个或多个线程等待一组操作完成后再继续执行。它的名字暗示着一个 计数器,它被初始化为一个正整数,当计数器的值达到零时,等待的线程可以继续执行。
它在多线程编程中非常有用,特别是在协调多个线程完成某项任务时
。一些常见的应用场景包括:等待线程池中的所有任务完成、多个线程协作执行某个操作、等待多个服务初始化完成等。
构造初始化:
- await()
- await(long timeout, TimeUnit unit)
await 跳出阻塞的条件:
- CDL 的计数变为了 0
- 到达了设置的超时时间
- 线程被中断,抛出 InterruptException
- countDown() 将计数器减一(此方法线程安全)
- getCount() 获取计数器的剩余次数
2. 使用实例¶
需求如下:
- 4 个开发等 1 个产品经理的原型和 PRD,“多等一”的场景。
- 1 个运维等待 4 个人开发完毕后上线,“一等多”的场景。
public class DevelopCountDownLatchTest {
private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
/**
* 产品经理的计数器
*/
protected final static CountDownLatch DEMAND_COUNT = new CountDownLatch(1);
/**
* 开发人员进度的计数器
*/
protected final static CountDownLatch DEVELOP_COUNT = new CountDownLatch(4);
public static void main(String[] args) throws Exception {
EXECUTOR.execute(new ProjectDevelop("java小红"));
EXECUTOR.execute(new ProjectDevelop("java小绿"));
EXECUTOR.execute(new ProjectDevelop("java小蓝"));
EXECUTOR.execute(new ProjectDevelop("java小紫"));
EXECUTOR.execute(new ProjectDemandPrd("需求小王"));
EXECUTOR.execute(new OperationUp("运维奇奇"));
}
/**
* 运维上线的任务
*/
private static class OperationUp implements Runnable {
private final String name;
private OperationUp(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + "正在等待开发完成...");
//运维开始等待项目开发完毕上线
DEVELOP_COUNT.await();
System.out.println("项目开发完毕,运维" + name + "开始上线.");
System.out.println("上线成功..");
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 需求设计PRD原型的任务
*/
private static class ProjectDemandPrd implements Runnable {
private final String name;
private ProjectDemandPrd(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + "产品经理此时正在紧张的设计原型和PRD.....");
TimeUnit.SECONDS.sleep(3);
System.out.println(name + "原型设计完毕.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
/* 必须保证可以执行 */
DEMAND_COUNT.countDown();
}
}
}
/**
* 开发们开发代码的任务
*/
private static class ProjectDevelop implements Runnable {
private final String name;
private ProjectDevelop(String name) {
this.name = name;
}
@Override
public void run() {
try {
// 等待 CDL
DEMAND_COUNT.await();
System.out.println(name + "获取到了原型和PRD,开始开发.");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(name + "开发完毕.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
DEVELOP_COUNT.countDown();
}
}
}
}
CyclicBarrier¶
1. 基本概念¶
CyclicBarrier 也是等待线程计数器递减至 0 后再执行后续操作。
不同于 CountDownLatch,CyclicBarrier 在所有线程完成计算任务(计数器归零)时,会触发内部的回调函数执行额外的操作。
- CountDownLatch 一旦计数器减为零,就无法再次使用,适用于一次性的等待任务。计数器归零后,无法重置或重新使用。
- CyclicBarrier 更具灵活性。当 CyclicBarrier 的计数器归零时,可以通过reset()方法重新设置计数器,使其可以在后续的同步点中再次使用。
定义方式:
不携带回调:
携带回调(重写 Runnable 接口的 run 方法):
通过匿名内部类(实例)创建了一个 Runnable 接口的对象
回调 是指当所有线程都到达屏障点(即调用 await()
的线程数量达到了 CyclicBarrier
设置的阈值)时,会触发一个额外的操作,这个操作通常是一个 Runnable
任务。这个 Runnable
任务会在所有线程继续执行之前被优先执行。
Cyclic Barrier 不需要 countDown() 方法
2. 场景实例¶
现在公司有一个需求,需要 4 名程序员在获取到产品经理的原型和 PRD 后才能开始开发,开发完成需要 1 名测试完成常规的测试后再安排 1 名运维上线服务。
public class DevelopAndTestCountDownLatchTest {
private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
/**
* 产品经理
*/
private static final CyclicBarrier PRD_COUNT = new CyclicBarrier(1, new StartDevelop());
/**
* 开发人员
*/
private static final CyclicBarrier DEVELOP_COUNT = new CyclicBarrier(4, new TestCode());
/**
* 测试人员
*/
private static final CyclicBarrier TEST_COUNT = new CyclicBarrier(1, new OperationTopLineCode());
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
System.out.println("产品经理此时正在紧张的设计原型和PRD.....");
TimeUnit.SECONDS.sleep(3);
System.out.println("原型设计完毕.");
PRD_COUNT.await();
}
/**
* 产品经理资料准备齐全后的回调
*/
private static class StartDevelop implements Runnable {
@Override
public void run() {
EXECUTOR.execute(new DevelopCode("java小红"));
EXECUTOR.execute(new DevelopCode("java小绿"));
EXECUTOR.execute(new DevelopCode("java小蓝"));
EXECUTOR.execute(new DevelopCode("java小紫"));
}
}
/**
* 开发人员开始进行开发代码
*/
private static class DevelopCode implements Runnable {
private final String name;
private DevelopCode(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + "开始开发代码.......");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(name + "完成了代码开发!");
//等待其他人完成开发
DEVELOP_COUNT.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 测试人员的测试任务
*/
private static class TestCode implements Runnable {
@Override
public void run() {
try {
System.out.println("开发人员全部都开发完成了,测试人员开始测试.");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("测试人员完成测试,服务没有问题,可以准备上线了.");
TEST_COUNT.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 运维上线代码
*/
private static class OperationTopLineCode implements Runnable{
@Override
public void run() {
try {
System.out.println("检测到测试完成,运维开始上线代码");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("上线完成");
//上线完成后关闭线程池
EXECUTOR.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Semaphore¶
中文信号量,允许多个线程在同一时刻访问共享资源,但限制了可以同时访问资源的线程的数量:维护一个计数器,该计数器表示可用的许可证数量,线程在访问资源前必须先获取许可证。如果许可证数量耗尽,后续的线程必须等待其他线程释放许可证,以便获得访问权。
- aquire():试图获得许可证
- release():释放获得的许可证
- 其他方法可获 / 释放得多个许可证 / 设置等待时间等
当需要对线程访问资源的并发数量和速率进行精确控制时,
Semaphore
提供了一种有效的解决方案。在实际应用场景中,我们可以将Semaphore
用于实现令牌桶算法,确保请求只有在成功获取令牌的情况下才能被转发到服务上。这种方式不仅有效地控制了并发访问速率,还可以防止资源的过度消耗,保护服务的稳定性和可用性。
Condition¶
1. 基本概念¶
一种线程间的通信机制,和 wait / notify 相似,
Condition
的await()
和Object
的wait()
在被唤醒后都可以继续执行下一行代码
Condition
通常与 ReentrantLock
或 ReentrantReadWriteLock
配合使用,用于管理线程的等待和通知机制。Condition允许线程在满足特定条件之前等待,以实现更复杂的同步控制。
- await():必须在 lock() 加锁块中使用,可以被中断,等待其他线程的唤醒
- signal():公平锁的情况下,唤醒一个等待时间最长的线程。非公平锁的情况下,不会遵循先进先出的唤醒顺序。非公平锁更加倾向于允许刚刚阻塞的线程立即获得锁,而不考虑等待时间的长短。
- signalAll():唤醒所有条件关联的线程
- 略
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
condition.await(); / condition.signal();
} finally {
lock.unlock();
}
2. 场景实例¶
假设有一个停车场,该停车场有 4 个停车位。车辆可以进入到停车场停车,如果当前停车位已满,车辆就需要等待停车位。
public class ParkingLot {
private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
private final static ReentrantLock LOCK = new ReentrantLock();
private final static Condition CONDITION = LOCK.newCondition();
/**
* 停车场总位置数量
*/
private final int totalParkingSpaces;
/**
* 已经停了多少量
*/
private int occupiedSpaces = 0;
public ParkingLot(int totalParkingSpaces) {
this.totalParkingSpaces = totalParkingSpaces;
}
public static void main(String[] args) {
ParkingLot parkingLot = new ParkingLot(5);
for (int i = 0; i < 10; i++) {
EXECUTOR.execute(new CarActive(parkingLot, "车辆"+i));
}
}
/**
* 尝试进入停车场
*/
public void park(String name){
LOCK.lock();
try {
if(occupiedSpaces >= totalParkingSpaces){
// 如果停车场已满,等待
System.out.println(name + ": 车辆等待停车位...");
// 开始等待车位
CONDITION.await();
}
occupiedSpaces++;
System.out.println(name + ": 车辆成功停车,剩余的停车位:" + (totalParkingSpaces - occupiedSpaces));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
/**
* 驶离停车场
*/
public void bearOff(String name){
LOCK.lock();
try {
// 离开停车场 将已占用的数量-1
occupiedSpaces--;
System.out.println(name + ": 车辆离开停车场,剩余停车位: " + (totalParkingSpaces - occupiedSpaces));
// 通知等待的车辆有空位了
CONDITION.signal();
}finally {
LOCK.unlock();
}
}
private static class CarActive implements Runnable {
private final ParkingLot parkingLot;
private final String name;
private CarActive(ParkingLot parkingLot, String name) {
this.parkingLot = parkingLot;
this.name = name;
}
@Override
public void run() {
try {
parkingLot.park(name);
Thread.sleep((long) (Math.random() * 10000));
parkingLot.bearOff(name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}