跳转至

Java并发理论

线程的栈和线程的本地内存

在栈中的变量(局部变量、方法定义的参数、异常处理的参数)不会在线程之间共享,也就不会有内存可见性的问题,也不受内存模型的影响。而在堆中的变量是共享的,一般称之为共享变量。(内存模型影响的是堆中的共享变量)

所以,内存可见性针对的是堆中的共享变量

线程的本地内存并不是同一层次的概念(参考 Cache Coherence):

  • 线程的本地内存是一个包括内存和缓存的概念,会有一个 CPU/缓存 取指和运算的过程(本质上是寄存器的取指令和运算过程)
  • 线程的栈用于存储线程的本地变量的运算结果,其计算过程是通过线程的本地内存实现的

注意 Cache 的 write-back 策略

JMM

Java内存模型规定所有的变量都存储在主内存中,包括实例变量,静态变量,但是不包括局部变量和方法参数。每个线程都有自己的工作内存,线程的工作内存保存了该线程用到的变量和主内存的副本拷贝,线程对变量的操作都在工作内存中进行线程不能直接读写主内存中的变量

无论是局部变量还是全局变量,都通过本地内存支持

JMM 定义了:

原子性

int i = 2; // 保证原子性(基本类型的赋值操作,JMM保证原子性)

int j = i; // 不保证(先读后赋,2步)

i++; // 不保证(先读再加再赋,3步)

i = i + 1; // 不保证(3步)

可见性

当一个线程修改共享变量的值是否能立刻被其他变量知道即称为可见性,可见性要求另一个线程读取这个共享变量时共享变量已经被写入到了主存中。

  • volatile:变量被修改后立刻被刷新到内存中,其他线程可以立刻在内存中读取到新的变量值

    但是 volatile 不保证原子性:当 i 读到CPU寄存器中后可能立即切换

  • synchronized:在 unlock 之前保证改变的内容会被写入到主存中
  • final

有序性

指令执行的有序性

  • volatile:使用内存屏障禁止指令重排
  • synchronized:被包住的代码块在线程间的执行是串行的

并发线程基础

线程状态转换

image-20241214200929604

  • new :被创建,但是没有调用 start()
  • runnable:java 将操作系统中的就绪和运行统称为 runnable
  • blocked:线程阻塞于锁(synchronized 方法/代码块)
  • waiting:线程等待,需要其他线程唤醒(包括阻塞在并发包中 Lock 接口的状态,因为 Lock 接口的阻塞使用 LockSupport 类方法)
  • time_waiting:在指定时间后自动返回
  • terminated:当前线程已经执行完毕

线程的构造方式

1.实现 Runnable 接口

不同的 Thread 传入相同的 My 实例后,如果 My 中有同步方法(锁是 My 实例),则不同的 Thread 使用相同的锁

public class My implements Runnable {
    @Override
    public void run() {
        ///
    }
}

public static void main(String[] args)
{
    My my = new My();
    Thread t = new Thread(my);
    t.start();
}

2. 实现 Callable 接口

先将 实现的 Callable 实例传入 FutureTask,随后将 FutureTask 实例传入 Thread ,启动后可以通过 FutureTask 实例调用 get() 查看结果

public class My implements Callable<Integer> {
    public Integer call()
    {
        return 123;
    }
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
    My my = new My();
    FutureTask<Integer> ft = new FutureTask<>(my);
    Thread t = new Thread(ft);
    t.start();
    System.out.print(ft.get());
}

3. 继承 Thread 类

比较:

实现接口更好,因为:

  1. Java 不支持多继承,继承了 Thread() 类就无法继承其他类了
  2. Thread 类中有一些字段不会用到,子类继承父类之后在构建对象时会同时分配父类字段占用的内存,开销过大

Lambda 表达式实现

Thread threadA = new Thread(() -> {
    example.waitMethod();
});

等价于:

Thread threadA = new Thread(new Runnable() {
    @Override
    public void run() {
        example.waitMethod();
    }
});

线程的中断和终止

  • 当一个处于休眠状态的线程被调用 interrupt() 方法时,线程会立即抛出 InterruptedException,并从休眠中唤醒。
  • 线程的中断状态会被清除(设置为 false)。
  • 线程会恢复执行,继续执行捕获异常后的代码。

基于中断的停止模式:A 线程如果想停止 B 线程,向 B 发出一个中断信号后最后仍然由 B 决定是否停止(立刻 / 延后 / 忽略)

线程间通信

  1. Volatile:A / B 线程使用同一个普通共享变量时,可能存在可见性问题。使用 Volatile 保证:如果 A 在 B 开始之前完成了修改,则 B 一定可以读到修改后的值
  2. Synchronized:一段时间内只有一个线程可以处于同步方法或者同步块中
  3. wait / notify:被定义在 Object 类上,对于调用同一个对象上的 wait() / notify() 进行控制。wait() 可以传入时间进入 time_wait 状态,也可以直接进入 wait 状态。获取到锁后(syn方法 / 块)才能调用 wait() ,调用 wait() 后会释放对象的锁
  4. join():A 调用 B.join() 表示 A需要等待 B 完全执行完成才会从 B.join() 后继续执行。同时也支持传入参数,即调用 join() 后 A 可以进入 wait 或 time_wait

ThreadLocal

1. 线程隔离性

  • 成员变量:属于对象级别的变量,多个线程共享同一个对象时,会共享该对象的成员变量。
  • ThreadLocal:属于线程级别的变量,每个线程都有自己独立的副本,即使它们操作的是同一个对象的 ThreadLocal 变量。
public class MyClass {
    private ThreadLocal<Integer> threadLocalValue = new ThreadLocal<>();

    public void setVal(int val)
    {
        threadLocalValue.set(value); // 使用 ThreadLocal 类的方法 set
    }

    public int getVal()
    {
        return threadLocalValue.get(); // 使用 ThreadLocal 类的方法 get
    }
}

作用于 MyClass 的每个线程调用 setVal 和 getVal 时都只会操作本线程的变量副本

2. 上下文传递特性

在日常的 Web 开发中,我们经常需要获取当前登录用户的信息和权限。如果不使用 ThreadLocal,我们需要从数据库或缓存中查询用户信息,然后通过方法参数层层传递,最终将用户信息传递到需要的地方。这种情况下,代码可能会变得冗长和复杂。

如果我们在请求开始的时候,比如在前置拦截器中获取请求头中的用户信息,将用户信息从 db 或者缓存中查询之后放到 ThreadLocal 中,那么在后续所有的方法中,都可以从该上下文中获取用户信息。

public class ThreadLocalParameterPassing {
    private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {
        InterceptorTest interceptorTest = new InterceptorTest();
        ControllerTest controllerTest = new ControllerTest();
        EXECUTOR.execute(() ->{
            try{
                //调用前置拦截拿到 User 对象(这里模拟了查询过程和 set context 过程)
                interceptorTest.beforeInterceptor(1);
                //执行controller逻辑
                controllerTest.deleteById(10);
            }finally {
                //调用后置拦截
                interceptorTest.afterInterceptor();
            }

        });
        EXECUTOR.execute(() ->{
            try{
                //调用前置拦截
                interceptorTest.beforeInterceptor(2);
                //执行controller逻辑
                controllerTest.deleteById(11);
            }finally {
                //调用后置拦截
                interceptorTest.afterInterceptor();
            }
        });
    }

}

class InterceptorTest {
    /**
     *
     * 前置拦截
     * 模拟从请求头中获取userId
     * @param userId 用户的名称
     */
    public void beforeInterceptor(int userId){
        //模拟查询数据库或者缓存
        User user;
        if(1 == userId) {
            user = new User("小红", userId, "北京市朝阳区");
        }else {
            user = new User("小绿", userId, "北京市海淀区");
        }
        UserContext.setUser(user);
    }

    /**
     * 后置拦截
     */
    public void afterInterceptor(){
        //删除本次线程缓存
        UserContext.removeUser();
    }
}

class ControllerTest {
    public void deleteById(Integer id) {
        ServiceTest serviceTest = new ServiceTest();
        serviceTest.deleteById(id);
    }
}

class ServiceTest {

    public void deleteById(Integer id) {
        User user = UserContext.getUser();

        try {
            System.out.println(user.getName() + "开始删除数据...");
            TimeUnit.SECONDS.sleep(2);
            System.out.println(user.getName() + "删除成功...");
            writeLog(true, "成功");
        } catch (InterruptedException e) {
            e.printStackTrace();
            writeLog(false, e.getMessage());
        }

    }

    public void writeLog(boolean success, String errorMessage){
        User user = UserContext.getUser();
        System.out.printf("开始记录日志:用户%s根据id删除了某个东西,结果是:%s, 信息是:%s%n", user.getName(), success, errorMessage);
    }
}

/**
 * 构建用户上下文
 */
class UserContext {
    private static final ThreadLocal<User> USER_CONTEXT = new ThreadLocal<>(); // 不设置为 static 首先下面的 static 方法就访问不了

    public static User getUser(){
        return USER_CONTEXT.get();
    }
    public static void setUser(User user){
        USER_CONTEXT.set(user);
    }
    public static void removeUser(){
        USER_CONTEXT.remove();
    }
}

@Data
class User {
    private String name;
    private Integer userId;
    private String address;

    public User(String name, Integer userId, String address) {
        this.name = name;
        this.userId = userId;
        this.address = address;
    }
}

当调用 USER_CONTEXT.get()USER_CONTEXT.set(user) 时,USER_CONTEXT 本身会被用作 key,去当前线程的 ThreadLocalMap 中查找或存储值。

ThreadLocal 的机制

  • 共享的 ThreadLocal 实例

    • ThreadLocal 实例本身是共享的,所有线程都使用同一个 ThreadLocal 实例。

      private static final ThreadLocal<User> userContext = new ThreadLocal<>();
      

      这里设置为了 final ,保证了线程安全

  • 每个线程独立的 ThreadLocalMap:(在 Thread 类中)

    • 每个线程内部都有一个私有的 ThreadLocalMap,用于存储线程本地变量。
    • ThreadLocalMap 是一个类似于 HashMap 的数据结构,但它只属于当前线程。
  • ThreadLocal 实例作为 键(key)

    • 当线程调用 ThreadLocal.get()ThreadLocal.set() 时,ThreadLocal 实例会被用作 key,去当前线程的 ThreadLocalMap 中查找或存储值。

      userContext.set(user); // 将 user 存储到当前线程的 ThreadLocalMap 中,key 是 userContext
      User user = userContext.get(); // 从当前线程的 ThreadLocalMap 中获取值,key 是 userContext
      
  • 线程的副本指的是键对应的 值(value)

    • 每个线程在调用 ThreadLocal.set() 时,会将值存储到自己的 ThreadLocalMap 中,键是 ThreadLocal 实例,值是用户设置的对象(例如 User)。
    • 每个线程的 ThreadLocalMap 中存储的值是独立的,因此每个线程都有自己独立的变量副本。

set() 方法的逻辑如下:

public void set(T value) {
    //获取当前的线程
    Thread t = Thread.currentThread();
    //从当前的线程中获取ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        map.set(this, value); // 这里的 this 指的是调用 set() 的 ThreadLocal 实例
    } else {
        // 创建线程的 map
    }
}

内存泄漏问题

ThreadLocalMap 中的 Entry(即 key-value 键值对)中的 key 是一个弱引用:

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

注意这里调用的 super 构造方法使 key 被传递到了弱引用的构造器中,而 value 被一个成员变量存储是一个强引用

ThreadLocalMap 的生命周期是根据 Thread 的生命周期来的,如果线程运行完就销毁,那么内存泄漏也不存在。但是,事实上我们生产环境中,不可能不使用线程池,线程池创建的核心线程基本都是常驻线程,常驻线程的生命周期是永久性的,所以 ThreadLocalMap 中的数据只要你不删除,那么它就是永久性存在的。

为什么设计为弱引用?

我们在调用 set/get/remove/rehash 任意一个方法,ThreadLocal 都会验证 key 是否为 null,如果确实是 key 为 null,则将 value 也设置为 null。这样 value 的强引用就被断开了,value 就会被 JVM 回收。

注意,这里 Thread 内部的 Map 中的 key 是对 ThreadLocal 变量的引用,这个对象要被回收肯定需要先等外部的 ThreadLocal 对象的引用变量失效才行(这里可能需要进一步了解 JVM 的垃圾回收机制)

弱引用不能完全解决泄露(虽然 key 是弱引,但是 val 是强引用)

如果不调用 remove 等方法,会导致 null->value 键值对即使已经失去了意义,也不会被释放

注意,即使我们不使用线程池也绕不开这个问题,不主动使用线程池但是所用的 Tomcat 里面用的有线程池,一个请求被分发到 controller 这个过程其实就对应着一个 Tomcat 线程池中的线程执行任务的过程。

  • Spring MVC 的 Controller 层是通过 Servlet 实现的,而 Servlet 是由 Tomcat 管理的。
    • 因此,Controller 层的线程是由 Tomcat 控制的。
  • Spring 的 ServiceMapper 层是同步调用的
    • 在默认情况下,ServiceMapper 层的调用是同步的,运行在同一个线程中。
    • 因此,ServiceMapper 层的线程也是由 Tomcat 控制的。

在默认情况下,整个程序的运行(包括 ControllerServiceMapper 层)都是由 Tomcat 控制的,前提是你没有显式地使用异步处理或自定义线程池。

CAS

Compare And Swap

优势与劣势

优势:

  • 高效:CAS操作是硬件级别的原子操作,无需加锁,比 synchronized 等锁机制更高效
  • 避免死锁:无需获取锁来执行操作
  • 高并发性:允许多个线程同时尝试,但只有一个成功,其他会重试或执行其他操作
  • 原子性:要成功执行完操作,要么失败,没有中间中间状态(硬件保证)

劣势:

  • 自旋:操作失败可能会不断自旋,浪费 CPU(需要谨慎设置自旋次数的上限)
  • ABA:不关心值的变化过程,可以使用版本号解决

原理

  1. 读取操作:CAS 操作首先读取内存位置的当前值,这是基于硬件提供的原子性操作。这个值将被用于后续比较和更新步骤。
  2. 比较操作:CAS 会将读取的当前值与预期值(也称为期望值)进行比较。如果当前值等于预期值,则说明没有其他线程在读取或修改这个内存位置的数据,此时 CAS 操作可以继续执行。
  3. 更新操作:如果比较操作成功(当前值等于预期值),CAS 会使用新值来更新内存位置的内容。这个更新操作是原子的,操作系统确保了不会存在多个线程同时修改这个内存位置的值。
  4. 失败和重试:如果比较操作失败(当前值不等于预期值),CAS 会返回一个失败标志,表明其他线程已经修改了内存位置的值。在这种情况下,可以选择重试 CAS 操作,或者采取其他策略来解决竞态条件问题。

AtomicXXX 原理

Atomic 类主要通过 CAS 来实现。

以 AtomicInteger 为例:

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

如果在 c++ 实现的 native 方法 compareAndSwap 实现之前有其他线程成功修改了 v ,那么会再次读取并尝试修改(自旋)