跳转至

Java锁与并发控制

Lock 接口

image-20241215165631492

ReentrantLock

ReentrantLock 是 Java 中提供的一种可重入独占锁

1. 基本使用

  • lock() :尝试获取锁,拿不到就一直等待(直到被抢占)
  • tryLock():有返回值的 lock(),注意,返回返回值后可以继续执行后面的代码,所以后面的代码需要加一个 if 判断返回值是否为 true
  • tryLock(long timeout, TimeUnit unit):等到了锁或时间到了就会返回,可被中断
  • lockInterruptibly():一直等,可被中断
  • unlock():将获取到的锁解锁
lock.lock();
try {

} catch () {

} finally {
    lock.unlock();
}

2. 公平锁

如果有多个线程在等待锁,公平锁会按照线程请求锁的顺序授予锁的许可,即先来先得的原则。

ReentrantLock默认是非公平锁,如果想要改变它,在定义的时候传递一个 true 即可:

protected final static ReentrantLock LOCK = new ReentrantLock(true);

场景:现在存在 2 个线程,每一个线程在释放锁之后,再次获取一次锁,因为是公平锁的原因,因此两个线程会交替执行 1 -> 2 -> 1 -> 2

当前线程放锁后会有一个唤醒机制,唤醒队列头部的线程(需要一定时间),不允许此时其他任何线程争抢锁

3. 非公平锁

非公平锁最大的特点就是当一个任务获取锁的时候,如果恰好前面的线程释放锁,此时当前任务不再进行排队,直接插队执行任务。非公平锁在高并发场景下,会省略大量的唤醒线程的操作,但是极端情况下会造成等待队列中的任务一直被插队一直执行不了。

4. 可中断锁

可中断锁就是线程在等待锁的过程中可以响应中断信号,如果一个线程在等待锁时被中断,它会立即退出等待状态,并抛出 InterruptedException。这种锁的主要目的是提供更好的线程控制,以避免线程在等待锁的过程中无限期地阻塞。

@Override
public  void run() {
    try {
        System.out.println(Thread.currentThread().getName() + "尝试获取锁." );
        LOCK.lockInterruptibly();
        try {
            System.out.println(Thread.currentThread().getName() + "获取到了锁." );
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }finally {
            LOCK.unlock();
        }
    } catch (InterruptedException e) {
        System.out.println(Thread.currentThread().getName() + "等待锁的时候被中断,结束等待." );
    }
}

ReentrantReadWriteLock

ReentrantReadWriteLock 实现了 ReadWriteLock 接口,它可以用于管理多个线程对共享资源的并发访问。与标准的互斥锁不同,读写锁将锁分为两种类型:读锁(共享锁)和写锁(排他锁)。这种分离的锁类型允许多个线程同时读取共享资源,但在进行写操作时需要排他地获取锁。

从 ReentrantReadWriteLock 中可以衍生出 ReadLock(读锁)和 WriteLock(写锁)。

  • 读锁:读锁的类型是共享锁,它允许多个线程同时操作临界区。
  • 写锁:写锁的类型是排他锁,它允许同时只能有一个线程占有写锁。

写锁是独占锁,写锁持有期间,不允许有其他的读锁或者写锁占有

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}
  • readLock() 返回一个 Lock 类型的读锁。
  • writeLock() 返回一个 Lock 类型的写锁。

1. 读锁

ReadLockReentrantReadWriteLock 内部的一个静态类,实现了 Lock 接口。

ReentrantReadWriteLock 并没有直接实现 Lock 接口,而是实现了 ReadWriteLock 接口。

private final static ReentrantReadWriteLock RRWL = new ReentrantReadWriteLock();
// 定义读锁
private final static ReentrantReadWriteLock.ReadLock RL = RRWL.readLock();

这里有点类似 Map.Entry e : mp.entrySet(); 的语法

同一个 RRWL 实例的读写和写锁可以互相影响

2. 写锁

定义方法同理

3. 读写锁的公平性

ReentrantReadWriteLock REENTRANT_READ_WRITE_LOCK = new ReentrantReadWriteLock(true);

非公平模式的行为(不传入 true 的默认设定)

  • 读锁插队:如果队列头部的线程持有读锁,新的读线程可以直接尝试获取读锁,而不需要排队。这种设计允许读操作并发执行,从而提高吞吐量。
  • 写锁不允许插队:如果队列头部的线程持有写锁,新的读线程或写线程都不能插队。写锁是独占的,必须等待当前写锁释放后才能获取锁。这种设计是为了避免写线程长时间等待,防止写饥饿问题。

这种设计在读多写少的场景下能够显著提升性能,同时避免写线程饥饿问题(防止源源不断的读锁过来读取数据,导致写锁迟迟得不到执行)

并发流程控制

CountDownLatch

1. 基本概念

允许一个或多个线程等待一组操作完成后再继续执行。它的名字暗示着一个 计数器,它被初始化为一个正整数,当计数器的值达到零时,等待的线程可以继续执行。

它在多线程编程中非常有用,特别是在协调多个线程完成某项任务时。一些常见的应用场景包括:等待线程池中的所有任务完成、多个线程协作执行某个操作、等待多个服务初始化完成等。

构造初始化:

CountDownLatch CDL = new CountDownLatch(5); // 计数器初始化为 5
  • await()
  • await(long timeout, TimeUnit unit)

await 跳出阻塞的条件:

  1. CDL 的计数变为了 0
  2. 到达了设置的超时时间
  3. 线程被中断,抛出 InterruptException
  • countDown() 将计数器减一(此方法线程安全)
  • getCount() 获取计数器的剩余次数
try {
    CDL0.await();
} catch {

} finally {
    CDL1.countDown();
}

2. 使用实例

需求如下:

  1. 4 个开发等 1 个产品经理的原型和 PRD,“多等一”的场景。
  2. 1 个运维等待 4 个人开发完毕后上线,“一等多”的场景。
public class DevelopCountDownLatchTest {

    private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

    /**
     * 产品经理的计数器
     */
    protected final static CountDownLatch DEMAND_COUNT = new CountDownLatch(1);
    /**
     * 开发人员进度的计数器
     */
    protected final static CountDownLatch DEVELOP_COUNT = new CountDownLatch(4);

    public static void main(String[] args) throws Exception {
        EXECUTOR.execute(new ProjectDevelop("java小红"));
        EXECUTOR.execute(new ProjectDevelop("java小绿"));
        EXECUTOR.execute(new ProjectDevelop("java小蓝"));
        EXECUTOR.execute(new ProjectDevelop("java小紫"));
        EXECUTOR.execute(new ProjectDemandPrd("需求小王"));
        EXECUTOR.execute(new OperationUp("运维奇奇"));
    }

    /**
     * 运维上线的任务
     */
    private static class OperationUp implements Runnable {

        private final String name;

        private OperationUp(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + "正在等待开发完成...");
                //运维开始等待项目开发完毕上线
                DEVELOP_COUNT.await();
                System.out.println("项目开发完毕,运维" + name + "开始上线.");
                System.out.println("上线成功..");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 需求设计PRD原型的任务
     */
    private static class ProjectDemandPrd implements Runnable {

        private final String name;

        private ProjectDemandPrd(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + "产品经理此时正在紧张的设计原型和PRD.....");
                TimeUnit.SECONDS.sleep(3);
                System.out.println(name + "原型设计完毕.");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                /* 必须保证可以执行 */ 
                DEMAND_COUNT.countDown();
            }
        }
    }

    /**
     * 开发们开发代码的任务
     */
    private static class ProjectDevelop implements Runnable {

        private final String name;

        private ProjectDevelop(String name) {
            this.name = name;
        }

        @Override
        public void run() {

            try {
                // 等待 CDL
                DEMAND_COUNT.await();
                System.out.println(name + "获取到了原型和PRD,开始开发.");
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println(name + "开发完毕.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                DEVELOP_COUNT.countDown();
            }
        }
    }
}

CyclicBarrier

1. 基本概念

CyclicBarrier 也是等待线程计数器递减至 0 后再执行后续操作。

不同于 CountDownLatch,CyclicBarrier 在所有线程完成计算任务(计数器归零)时,会触发内部的回调函数执行额外的操作。

  • CountDownLatch 一旦计数器减为零,就无法再次使用,适用于一次性的等待任务。计数器归零后,无法重置或重新使用。
  • CyclicBarrier 更具灵活性。当 CyclicBarrier 的计数器归零时,可以通过reset()方法重新设置计数器,使其可以在后续的同步点中再次使用

定义方式:

不携带回调:

CyclicBarrier barrier = new CyclicBarrier(2);

携带回调(重写 Runnable 接口的 run 方法):

通过匿名内部类(实例)创建了一个 Runnable 接口的对象

CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
    @Override
    public void run() {

    }
});

回调 是指当所有线程都到达屏障点(即调用 await() 的线程数量达到了 CyclicBarrier 设置的阈值)时,会触发一个额外的操作,这个操作通常是一个 Runnable 任务。这个 Runnable 任务会在所有线程继续执行之前被优先执行。

Cyclic Barrier 不需要 countDown() 方法

2. 场景实例

现在公司有一个需求,需要 4 名程序员在获取到产品经理的原型和 PRD 后才能开始开发,开发完成需要 1 名测试完成常规的测试后再安排 1 名运维上线服务。

public class DevelopAndTestCountDownLatchTest {

    private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

    /**
     * 产品经理
     */
    private static final CyclicBarrier PRD_COUNT = new CyclicBarrier(1, new StartDevelop());
    /**
     * 开发人员
     */
    private static final CyclicBarrier DEVELOP_COUNT = new CyclicBarrier(4, new TestCode());

    /**
     * 测试人员
     */
    private static final CyclicBarrier TEST_COUNT = new CyclicBarrier(1, new OperationTopLineCode());



    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {

        System.out.println("产品经理此时正在紧张的设计原型和PRD.....");
        TimeUnit.SECONDS.sleep(3);
        System.out.println("原型设计完毕.");
        PRD_COUNT.await();
    }


    /**
     * 产品经理资料准备齐全后的回调
     */
    private static class StartDevelop implements Runnable {

        @Override
        public void run() {
            EXECUTOR.execute(new DevelopCode("java小红"));
            EXECUTOR.execute(new DevelopCode("java小绿"));
            EXECUTOR.execute(new DevelopCode("java小蓝"));
            EXECUTOR.execute(new DevelopCode("java小紫"));
        }
    }


    /**
     * 开发人员开始进行开发代码
     */
    private static class DevelopCode implements Runnable {
        private final String name;

        private DevelopCode(String name) {
            this.name = name;
        }

        @Override
        public void run() {

            try {
                System.out.println(name + "开始开发代码.......");
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println(name + "完成了代码开发!");
                //等待其他人完成开发
                DEVELOP_COUNT.await();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

    /**
     * 测试人员的测试任务
     */
    private static class TestCode implements Runnable {
        @Override
        public void run() {
            try {
                System.out.println("开发人员全部都开发完成了,测试人员开始测试.");
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("测试人员完成测试,服务没有问题,可以准备上线了.");
                TEST_COUNT.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 运维上线代码
     */
    private static class OperationTopLineCode implements Runnable{

        @Override
        public void run() {

            try {
                System.out.println("检测到测试完成,运维开始上线代码");
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("上线完成");
                //上线完成后关闭线程池
                EXECUTOR.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Semaphore

中文信号量,允许多个线程在同一时刻访问共享资源,但限制了可以同时访问资源的线程的数量:维护一个计数器,该计数器表示可用的许可证数量,线程在访问资源前必须先获取许可证。如果许可证数量耗尽,后续的线程必须等待其他线程释放许可证,以便获得访问权。

  • aquire():试图获得许可证
  • release():释放获得的许可证
  • 其他方法可获 / 释放得多个许可证 / 设置等待时间等

当需要对线程访问资源的并发数量和速率进行精确控制时,Semaphore提供了一种有效的解决方案。在实际应用场景中,我们可以将Semaphore用于实现令牌桶算法,确保请求只有在成功获取令牌的情况下才能被转发到服务上。这种方式不仅有效地控制了并发访问速率,还可以防止资源的过度消耗,保护服务的稳定性和可用性。

Condition

1. 基本概念

一种线程间的通信机制,和 wait / notify 相似,Conditionawait()Objectwait() 在被唤醒后都可以继续执行下一行代码

Condition通常与 ReentrantLockReentrantReadWriteLock 配合使用,用于管理线程的等待和通知机制。Condition允许线程在满足特定条件之前等待,以实现更复杂的同步控制

  • await():必须在 lock() 加锁块中使用,可以被中断,等待其他线程的唤醒
  • signal():公平锁的情况下,唤醒一个等待时间最长的线程。非公平锁的情况下,不会遵循先进先出的唤醒顺序。非公平锁更加倾向于允许刚刚阻塞的线程立即获得锁,而不考虑等待时间的长短。
  • signalAll():唤醒所有条件关联的线程
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

lock.lock();
try {
    condition.await(); / condition.signal();
} finally {
    lock.unlock();
}

2. 场景实例

假设有一个停车场,该停车场有 4 个停车位。车辆可以进入到停车场停车,如果当前停车位已满,车辆就需要等待停车位。

public class ParkingLot {
    private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

    private final static ReentrantLock LOCK = new ReentrantLock();
    private final static Condition CONDITION = LOCK.newCondition();

    /**
     * 停车场总位置数量
     */
    private final int totalParkingSpaces;
    /**
     * 已经停了多少量
     */
    private int occupiedSpaces = 0;

    public ParkingLot(int totalParkingSpaces) {
        this.totalParkingSpaces = totalParkingSpaces;
    }

    public static void main(String[] args) {
        ParkingLot parkingLot = new ParkingLot(5);
        for (int i = 0; i < 10; i++) {
            EXECUTOR.execute(new CarActive(parkingLot, "车辆"+i));
        }
    }

    /**
     * 尝试进入停车场
     */
    public void park(String name){
        LOCK.lock();
        try {
            if(occupiedSpaces >= totalParkingSpaces){
                // 如果停车场已满,等待
                System.out.println(name + ": 车辆等待停车位...");
                // 开始等待车位
                CONDITION.await();
            }
            occupiedSpaces++;
            System.out.println(name + ": 车辆成功停车,剩余的停车位:" + (totalParkingSpaces - occupiedSpaces));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            LOCK.unlock();
        }
    }

    /**
     * 驶离停车场
     */
    public void bearOff(String name){
        LOCK.lock();
        try {
            // 离开停车场  将已占用的数量-1
            occupiedSpaces--;
            System.out.println(name + ": 车辆离开停车场,剩余停车位: " + (totalParkingSpaces - occupiedSpaces));
            // 通知等待的车辆有空位了
            CONDITION.signal();
        }finally {
            LOCK.unlock();
        }
    }

    private static class CarActive implements Runnable {

        private final ParkingLot parkingLot;
        private final  String name;

        private CarActive(ParkingLot parkingLot, String name) {
            this.parkingLot = parkingLot;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                parkingLot.park(name);
                Thread.sleep((long) (Math.random() * 10000));
                parkingLot.bearOff(name);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}