JAVA_多线程_总结


多线程

硬件支持

在硬件层面,围绕多线程的使用和支持,计算机做了下面几种处理:

  1. CPU 增加了缓存,以均衡与内存的速度差异。—》会导致可见性问题
  2. 操作系统增加了进程,线程,以分时复用 CPU ,进而均衡 CPU 和 IO 设备的速度差异。—》会导致原子性问题
  3. 编译程序优化了指令执行顺序,使得缓存能够得到更加合理地利用。—》会导致有序性问题

CPU 缓存引起的可见性问题

// 线程A 对应 CPU1
int i = 0;
i = 10;
// 线程B 对应 CPU2
j = i;

当线程A 执行 i = 10 这段代码的时候,会先把初始值 0 加载到 CPU1 的高速缓存中,然后赋值为 10,那么 CPU1 的高速缓存中的 i 的值就是 10,这个时候数值还没写入到内存之中。

此时线程B 执行 j = i 的时候,它会先去内存中读取 i 的值并加载到 CPU2 的缓存中,但这个时候由于内存中 i 的值还是 0,因此 j = 0。

  • 解决

JAVA 中解决可见性问题,使用的是 volatile 关键字。当一个共享变量被 volatile 修饰的时候,它会保证修改的值会立即更新到内存,当其它线程需要读取时,它会去内存中读取值。(普通的共享变量不能保证

除此之外 synchronized 和 Lock 也能保证可见性,synchronized 和 Lock 能保证同一时刻线程获取锁后执行同步代码,并且释放锁之前会将变量的修改刷新到内存之中。

操作系统引起的原子性问题

int i = 1;
//线程A
i += 1;
//线程B
i += 1;

这里 i += 1 需要三条 CPU 指令去完成。执行过程如下:

  1. 将变量 i 从内存中读取到 CPU 寄存器。
  2. 在 CPU 寄存器中执行 i + 1 操作。
  3. 将最后的结果写入内存。(缓存机制可能会使得数据写入 CPU 的缓存而不是内存)

由于 CPU 分时复用(线程切换)的存在,线程A 执行第一条指令后,就切换到线程B 执行,假设线程B 执行完成之后在切换到线程A 继续执行后续两条指令,将会导致写入内存的值是 2 而不是 3。

  • 解决

使用 synchronized 和 Lock 来解决,synchronized 和 Lock 能保证同一时刻只有一个线程执行该代码块。

指令重排引起的有序性问题

int i = 0;
boolean flag = false;
// 语句1
i = 1;
// 语句2
flag = true;

思考一个问题:在程序执行的时候,语句1 和语句2 会按照顺序执行吗?

不会。因为在执行程序的时候为了提高性能,编译器和处理器会对指令进行重排。指令重排分为下面三种:

  1. 编译器指令重排,编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
  2. 指令集的重排,处理器会采用指令集并行技术来讲多条指令重叠执行,如果不存在数据依赖性,处理器会改变语句对应机器指令的执行顺序。
  3. 内存系统的重排,由于处理器使用缓存和读写缓冲区,这就使得加载和存储操作看上去可能是乱序执行。
  • 解决

使用 synchronized 和 Lock 来解决,synchronized 和 Lock 能保证同一时刻只有一个线程执行该代码块。

线程安全

一个类在可以被多个线程安全调用时就是线程安全的,线程安全不是一个非真即假的命题,可以根据安全程度进行下列的划分:不可变、绝对线程安全、相对线程安全、线程兼容和线程对立。

不可变

不可变的对象一定是线程安全的。

不可变的类型:

  1. final 关键字修饰的基本数据类型
  2. String 对象
  3. 枚举类型
  4. Number 部分子类,但 Number 的原子类 AtomicInteger 和 AtomicLong 是可变的

绝对线程安全

不管运行时环境如何,调用者都不需要任何额外的同步措施。

相对线程安全

相对线程安全需要保证对这个对象单独的操作是线程安全的,在调用的时候不需要做额外的保障措施。但是对于一些特定顺序的连续调用,就可能需要在调用端使用额外的同步手段来保证调用的正确性。

线程兼容

线程兼容是指对象本身并不是线程安全的,但是可以通过在调用端正确地使用同步手段来保证对象在并发环境中可以安全地使用。

线程对立

线程对立是指无论调用端是否采取了同步措施,都无法在多线程环境中并发使用的代码。

线程安全的实现

互斥同步

synchronized 和 ReentrantLock来实现。

非阻塞同步

  1. CAS
  2. 原子类 AtomicInteger
  3. ABA (可以通过 AtomicStampedReference 类来解决)

无同步方案

如果一个方法本来就不涉及共享数据,那它自然就无须任何同步措施去保证正确性。

  • 栈封闭

多个线程访问同一个方法的局部变量时,不会出现线程安全问题,因为局部变量存储在虚拟机栈中,属于线程私有的。

  • 线程本地存储

如果可以把共享的数据限制在同一个线程之内,这样就无须同步也能保证线程之间不出现数据竞争的问题。可以使用 ThreadLocal 来实现线程本地存储功能。

  • 可重入代码

在代码执行的任何时刻中断它,转而去执行另外一段代码(包括递归调用它本身),而在控制权返回后,原来的程序不会出现任何错误。

线程的概念

线程是程序执行时的最小单位,它是进程的一个执行流,是 CPU 调度和分派的基本单位,一个进程可以由很多个线程组成,线程间共享进程的所有资源,每个线程有自己的堆栈和局部变量。线程由CPU独立调度执行,在多 CPU 环境下就允许多个线程同时运行。同样多线程也可以实现并发操作,每个请求分配一个线程来处理。

线程的状态

  1. 新建:创建后尚未启动。
  2. 可运行:可能正在运行,也可能正在等待 CPU 时间片。
  3. 阻塞:等待获取一个排它锁,如果线程释放了锁就会结束此状态。
  4. 无限期等待:等待其它线程显示唤醒,否则不会被分配 CPU 时间片。
  5. 限期等待:无需等待其它线程显示地唤醒,在一定时间之后会被系统自动唤醒。
  6. 死亡:线程结束任务之后结束,或者产生异常结束。

实现方式

继承 Thread 类

public class MyThread extends Thread{

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("线程名:" + Thread.currentThread().getName() + "输出的结果:" + i);
        }
    }

    public static void main(String[] args) {
        MyThread myThread1 = new MyThread();
        MyThread myThread2 = new MyThread();
        MyThread myThread3 = new MyThread();
        myThread1.start();
        myThread2.start();
        myThread3.start();
    }
}

实现 Runnable 接口

public class MyRunnable implements Runnable{
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("线程名:" + Thread.currentThread().getName() + "输出的结果:" + i);
        }
    }

    public static void main(String[] args) {
        MyRunnable myRunnable = new MyRunnable();
        Thread thread1 = new Thread(myRunnable);
        Thread thread2 = new Thread(myRunnable);
        Thread thread3 = new Thread(myRunnable);
        thread1.start();
        thread2.start();
        thread3.start();
    }
}

实现 Callable 接口

public class MyCallable implements Callable {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 0; i < 10; i++) {
            System.out.println("线程名:" + Thread.currentThread().getName() + "输出的结果:" + i);
            sum++;
        }
        return sum;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyCallable myCallable = new MyCallable();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(myCallable);
        Thread thread = new Thread(futureTask);
        thread.start();
        Integer sum = futureTask.get();
        System.out.println(sum);
    }
}

使用 ExecutorService 实现

public class Executor implements Callable {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 0; i < 10; i++) {
            System.out.println("线程名:" + Thread.currentThread().getName() + "输出的结果:" + i);
            sum++;
        }
        return sum;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Executor executor = new Executor();
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        Future submit = executorService.submit(executor);
        Integer sum = (Integer) submit.get();
        System.out.println(sum);
    }
}

## JAVA中的锁

乐观锁和悲观锁

  • 乐观锁

对于同一个数据的并发操作,乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其他线程更新,则根据不同的实现方式执行不同的操作(例如报错或者自动重试)。底层采用 CAS 算法实现。

实现

// 乐观锁
private AtomicInteger atomicInteger = new AtomicInteger();  // 需要保证多个线程使用的是同一个AtomicInteger
atomicInteger.incrementAndGet(); //执行自增1
  • 悲观锁

对于同一个数据的并发操作,悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。例如:synchronized 关键字和 Lock 的实现类都是悲观锁。

实现

// synchronized
public synchronized void testMethod() {
    // 操作同步资源
}

// ReentrantLock
private ReentrantLockLock= new ReentrantLock(); // 需要保证多个线程使用的是同一个锁
public void modifyPublicRes() {
    lock.lock();
    // 操作同步资源
    lock.unlock();
}

自旋锁

  • 概念

阻塞或唤醒一个 Java 线程需要操作系统切换 CPU 状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长。

在很多场景下,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失。如果物理机器有多个处理器,能够让两个或以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃CPU的执行时间,看看持有锁的线程是否很快就会释放锁。

而为了让当前线程“稍等一下”,我们需让当前线程进行自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。

  • 优缺点

自旋锁不能代替阻塞。自旋等待虽然避免了线程切换的开销,但它要占用处理器时间。如果锁被占用的时间很短,自旋等待的效果就会非常好。反之,如果锁被占用的时间很长,那么自旋的线程只会白浪费处理器资源。所以,自旋等待的时间必须要有一定的限度,如果自旋超过了限定次数(默认是 10 次,可以使用 -XX:PreBlockSpin 来更改)没有成功获得锁,就应当挂起线程。

底层的实现原理同样也是 CAS,AtomicInteger 中调用 unsafe 进行自增操作的源码中的 do-while 循环就是一个自旋操作,如果修改数值失败则通过循环来执行自旋,直至修改成功。

公平锁和非公平锁

  • 公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。

公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU 唤醒阻塞线程的开销比非公平锁大。

  • 非公平锁

非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。

非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU 不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。

可重入锁

可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者类),不会因为之前已经获取过还没释放而阻塞。Java 中ReentrantLock 和 synchronized 都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。

例子

public class Widget {
    public synchronized void doSomething() {
        System.out.println("方法1执行...");
        doOthers();
    }

    public synchronized void doOthers() {
        System.out.println("方法2执行...");
    }
}

同一个线程在调用 doOthers() 时可以直接获得当前对象的锁,进入 doOthers() 进行操作。

独享锁和共享锁

  • 独享锁

独享锁也叫排他锁,是指该锁一次只能被一个线程所持有。如果线程A 对数据A 加上排它锁后,则其他线程不能再对数据A 加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。Javd 中的 synchronized 和 Lock 的实现类就是互斥锁。

  • 共享锁

共享锁是指该锁可被多个线程所持有。如果线程A 对数据A 加上共享锁后,则其他线程只能对数据A 再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。

独享锁与共享锁是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。

sychronized

对象锁

对象锁一般包含两种形式:synchronized (this) 和 synchronized 修饰普通方法。

  • synchronized (this)
public class SynchronizedDemo implements Runnable {

    private static SynchronizedDemo synchronizedDemo = new SynchronizedDemo();

    @Override
    public void run() {
        // 同步代码块形式——锁为this,两个线程使用的锁是一样的,线程1必须要等到线程0释放了该锁后,才能执行
        synchronized (this) {
            System.out.println("我是线程" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "结束");
        }
    }

    public static void main(String[] args) {
        Thread t1 = new Thread(synchronizedDemo);
        Thread t2 = new Thread(synchronizedDemo);
        t1.start();
        t2.start();
    }
}

结果

我是线程Thread-0
Thread-0结束
我是线程Thread-1
Thread-1结束

通过代码和运行结果可以看出线程 t1 和线程 t2 使用的是同一个对象锁(SynchronizedDemo 对象的锁),那么线程 t2 就必须在线程 t1 运行之后再获取资源。

  • synchronized 修饰普通方法
public class SynchronizedDemo implements Runnable {

    private static SynchronizedDemo synchronizedDemo = new SynchronizedDemo();

    @Override
    public void run() {
        method();

    }

    public synchronized void method() {
        System.out.println("我是线程" + Thread.currentThread().getName());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "结束");
    }

    public static void main(String[] args) {
        Thread t1 = new Thread(synchronizedDemo);
        Thread t2 = new Thread(synchronizedDemo);
        t1.start();
        t2.start();
    }
}

结果

我是线程Thread-0
Thread-0结束
我是线程Thread-1
Thread-1结束

synchronized 修饰普通方法,默认锁对象是 this。

类锁

先把上面两个对象锁的代码简单改造一下,看看运行结果。

  • synchronized (this) 加类锁
public class SynchronizedDemo implements Runnable {

    private static SynchronizedDemo synchronizedDemo1 = new SynchronizedDemo();
    private static SynchronizedDemo synchronizedDemo2 = new SynchronizedDemo();

    @Override
    public void run() {
        // 同步代码块形式——锁为this,两个线程使用的锁是一样的,线程1必须要等到线程0释放了该锁后,才能执行
        synchronized (this) {
            System.out.println("我是线程" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "结束");
        }
    }

    public static void main(String[] args) {
        Thread t1 = new Thread(synchronizedDemo1);
        Thread t2 = new Thread(synchronizedDemo2);
        t1.start();
        t2.start();
    }
}

结果

我是线程Thread-0
我是线程Thread-1
Thread-0结束
Thread-1结束
  • synchronized 修饰普通方法加类锁
public class SynchronizedDemo implements Runnable {

    private static SynchronizedDemo synchronizedDemo1 = new SynchronizedDemo();
    private static SynchronizedDemo synchronizedDemo2 = new SynchronizedDemo();

    @Override
    public void run() {
        method();

    }

    public synchronized void method() {
        System.out.println("我是线程" + Thread.currentThread().getName());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "结束");
    }

    public static void main(String[] args) {
        Thread t1 = new Thread(synchronizedDemo1);
        Thread t2 = new Thread(synchronizedDemo2);
        t1.start();
        t2.start();
    }
}

结果

我是线程Thread-0
我是线程Thread-1
Thread-0结束
Thread-1结束

类锁的实现一般包含两种形式,一种是 synchronized (对象) 和 synchronized 修饰静态方法

  • synchronized (对象)
public class SynchronizedDemo implements Runnable {

    private static SynchronizedDemo synchronizedDemo1 = new SynchronizedDemo();
    private static SynchronizedDemo synchronizedDemo2 = new SynchronizedDemo();

    @Override
    public void run() {
        // 同步代码块形式——锁为this,两个线程使用的锁是一样的,线程1必须要等到线程0释放了该锁后,才能执行
        synchronized (SynchronizedDemo.class) {
            System.out.println("我是线程" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "结束");
        }
    }

    public static void main(String[] args) {
        Thread t1 = new Thread(synchronizedDemo1);
        Thread t2 = new Thread(synchronizedDemo2);
        t1.start();
        t2.start();
    }
}

结果

我是线程Thread-0
Thread-0结束
我是线程Thread-1
Thread-1结束
  • synchronized 修饰静态方法
public class SynchronizedDemo implements Runnable {

    private static SynchronizedDemo synchronizedDemo1 = new SynchronizedDemo();
    private static SynchronizedDemo synchronizedDemo2 = new SynchronizedDemo();

    @Override
    public void run() {
        method();

    }

    public static synchronized void method() {
        System.out.println("我是线程" + Thread.currentThread().getName());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "结束");
    }

    public static void main(String[] args) {
        Thread t1 = new Thread(synchronizedDemo1);
        Thread t2 = new Thread(synchronizedDemo2);
        t1.start();
        t2.start();
    }
}

结果

我是线程Thread-0
Thread-0结束
我是线程Thread-1
Thread-1结束

JVM 中锁的优化

synchronized 的底层是两个字节码 monitorenter 和 monitorexit ,这两个字节码指令会让对象在执行的时候,使计数器做加 1 和 减 1 操作,但这两个字节码的执行是依赖于底层操作系统 Mutex Lock 来实现的,而 Mutex Lock 操作非常昂贵,因此在 JDK1.6 中开始对锁的实现引入了大量的优化。

主要的优化有:

同时在 JDK1.6 中,synchronized 同步锁一共有四种状态,分别是:无锁 → 偏向锁 → 轻量级锁 → 重量级锁。它会随着竞争逐渐升级,并且这个过程不可逆。

自旋锁

在之前的 synchronized 实现中,当多线程竞争锁时,当一个线程获取锁时,它会阻塞所有正在竞争的线程,这样对性能带来了极大的影响。并且挂起线程和恢复线程的操作都需要转入内核态中完成,这些操作对系统的并发性能带来了很大的压力。同时在很多情况下,共享数据的锁定状态只会持续很短的一段时间,为了这段时间去挂起和回复阻塞线程并不值得。

自旋锁在 JDK1.4 中是默认关闭的,在 JDK1.6 中是默认开启的,自旋锁是让没有获取到锁的线程等待一段时间,并且这段时间内不放弃 CPU 的执行时间,其中默认自旋次数为 10 次。

自适应自旋锁

为了避免线程锁在线程自旋刚结束就释放掉了锁,JDK1.6 引入了自适应自旋锁,它是由前一次在同一个锁上的自旋 时间及锁的拥有者的状态来决定的。如果在同一个锁对象上,自旋等待刚刚成功获取过锁,并且持有锁的线程正在运行中,那么 JVM 会认为该锁自旋获取到锁的可能性很大,会自动增加等待时间。相反,如果对于某个锁自旋很少成功获取锁。那以后要获取这个锁时将可能省略掉自旋过程,以避免浪费处理器资源。

锁粗化

在加同步锁时,尽可能的将同步块的作用范围限制到尽量小的范围(只在共享数据的实际作用域中才进行同步)但如果在连串的一系列操作都对同一个对象反复加锁和解锁,JVM 检测到这样一连串的操作都是对同一个对象加锁,那么 JVM 会将加锁同步的范围扩展(粗化)到整个一系列操作的外部,及只需要加锁一次就可以了。

锁消除

JVM 会判断程序中的同步明显会不会逃逸出去从而被其他线程访问到,如果不会那 JVM 就把它们当作栈上数据对待,认为这些数据是线程独有的,不需要加同步。此时就会进行锁消除。

轻量级锁

轻量级锁是相对于重量级锁而言的。使用轻量级锁时,不需要申请互斥量,仅仅将 Mark Word 中的部分字节 CAS 更新指向线程栈中的 Lock Record,如果更新成功,则轻量级锁获取成功,记录锁状态为轻量级锁;否则,说明已经有线程获得了轻量级锁,目前发生了锁竞争(不适合继续使用轻量级锁),接下来膨胀为重量级锁。

偏向锁

当一个线程访问加了同步锁的代码块时,会在对象头中存储当前线程的 ID后续这个线程进入和退出这段加了同步锁的代码块时,不需要再次加锁和释放锁。而是直接比较对象头里面是否存储了指向当前线程的偏向锁。如果相等表示偏向锁是偏向于当前线程的,就不需要再尝试获得锁了。

volatile

实例化一个对象其实可以分为三个步骤:

  • 分配内存空间。
  • 初始化对象。
  • 将内存空间的地址赋值给对应的引用。

但是由于操作系统可以对指令进行重排,所以上面的过程就可能发生变化,在多线程环境下就可能将一个未初始化的对象引用暴露出来,从而导致不可预料的结果。为了防止这个过程的重排序,可以将变量设置成 volatile 类型的变量可。

虚拟机为防止指令重排使用内存屏障技术来实现。

volatile 特性

  • volatile 不能保证完全的原子性,只能保证单次的读/写操作具有原子性。如 i++ 这样的操作。采用 volatile 修饰时并不能保证原子性,如果需要,可以使用 AtomicInteger 或 synchronized 来操作。

i++ 其实是一个复合操作,主要步骤为:

  • 读取i的值。
  • 对i加1。
  • 将i的值写回内存。

final

修饰类

当类的定义为 final 的时候,表明这个类不能被继承,同时这个类中所有的方法都隐式为 final。

修饰方法

  • private 方法是隐式的 final
  • final 方法是可以被重载的

修饰参数

当参数声明为 final 的时候,意味着方法中无法更改参数引用指向的对象。常用于匿名内部类传递参数。

修饰变量

  • 变量的值不能再被更改
  • static final 修饰变量的时候,必须在定义的时候赋值。

重排序

在多线程的环境下,为了保证线程的安全,final 采用了重排序的指令操作。包括下面几种方式:

final 修饰基本数据类型

  • 写操作:禁止 final 域写与构造方法重排序,即禁止 final 域写重排序到构造方法之外,从而保证该对象对所有线程可见时,该对象的 final 域全部已经初始化过。
  • 读操作:禁止初次读对象的引用与读该对象包含的 final 域的重排序。

final 修饰引用数据类型

  • 额外的约束:禁止在构造函数对一个 final 修饰的对象的成员域的写入与随后将这个被构造的对象的引用赋值给引用变量 重排序。

CAS

基础

CAS 是一条 CPU 的原子指令,作用是让 CPU 先进行比较两个值是否相等,然后原子地更新某个位置的值。(CAS 操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较下在旧值有没有发生变化,如果没有发生变化,才交换成新值,发生了变化则不交换。)

CAS 是乐观锁,使用的时候会存在如下几个问题。

  • ABA 问题

如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。

解决思路:

  1. 使用版本号,在变量前追加版本号,每次变量更新的时候把版本号加一。
  2. 使用 AtomicStampedReference 来解决,这个类的 compareAndSet 方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
  • 循环时长开销大

如果 JVM 能支持处理器提供的 pause 指令,那么效率会有一定的提升。

  • 只能保证一个共享变量的原子操作

解决思路

  1. 使用 AtomicReference 类来保证引用对象之间的原子性。

延伸的原子类

  1. AtomicBoolean :原子更新布尔类型
  2. AtomicInteger :原子更新整型
  3. AtomicLong :原子更新长整型
  4. AtomicIntegerArray :原子更新整型数组里的元素
  5. AtomicLongArray :原子更新长整型数组里的元素
  6. AtomicReferenceArray :原子更新引用类型数组里的元素
  7. AtomicReference :原子更新引用类型
  8. AtomicStampedReference :原子更新引用类型, 内部使用Pair来存储元素值及其版本号
  9. AtomicMarkableReferce :原子更新带有标记位的引用类型
  10. AtomicIntegerFieldUpdater :原子更新整型的字段的更新器
  11. AtomicLongFieldUpdater :原子更新长整型字段的更新器
  12. AtomicStampedFieldUpdater :原子更新带有版本号的引用类型
  13. AtomicReferenceFieldUpdater :上面已经说过此处不在赘述

AOS

AQS 是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,如:ReentrantLock,Semaphore,ReentrantReadWriteLock 都是基于 AOS 实现的。

核心思想

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH 队列是一个虚拟的双向队列 (虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点 (Node) 来实现锁的分配。

AQS 使用一个 Int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。

资源的共享方式

  • Exclusive (独占)

只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁。(公平锁:按照线程在队列中的排队顺序,先到者先拿到锁;非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的。)

  • Share (共享)

多个线程可同时执行。

底层

底层的设计使用的是模板方法模式,如果需要自定义同步器一般方式为:

  1. 继承 AbstractQueuedSynchronizer 并重写指定的方法,将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

需要重写的模板方法:

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

ReentrantLock

  • ReentrantLock 实现了 Lock 接口。
public class ReentrantLock implements Lock, java.io.Serializable 
  • ReentrantLock 属性
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    // 同步队列
    private final Sync sync;
}
  • ReentrantLock 构造函数
public ReentrantLock() {
    // 默认非公平策略
    sync = new NonfairSync();
}

// 可以传递参数确定采用公平策略或者是非公平策略,参数为true表示公平策略,否则,采用非公平策略:
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
  • ReentrantLock 内部有三个内部类
// Sync
abstract static class Sync extends AbstractQueuedSynchronizer
// NonfairSync
static final class NonfairSync extends Sync
// FairSync
static final class FairSync extends Sync

Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 序列号
    private static final long serialVersionUID = -5179523762034025860L;

    // 获取锁
    abstract void lock();

    // 非公平方式获取
    final boolean nonfairTryAcquire(int acquires) {
        // 当前线程
        final Thread current = Thread.currentThread();
        // 获取状态
        int c = getState();
        if (c == 0) { // 表示没有线程正在竞争该锁
            if (compareAndSetState(0, acquires)) { // 比较并设置状态成功,状态0表示锁没有被占用
                // 设置当前线程独占
                setExclusiveOwnerThread(current); 
                return true; // 成功
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 当前线程拥有该锁
            int nextc = c + acquires; // 增加重入次数
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            // 设置状态
            setState(nextc); 
            // 成功
            return true; 
        }
        // 失败
        return false;
    }

    // 试图在共享模式下获取对象状态,此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread()) // 当前线程不为独占线程
            throw new IllegalMonitorStateException(); // 抛出异常
        // 释放标识
        boolean free = false; 
        if (c == 0) {
            free = true;
            // 已经释放,清空独占
            setExclusiveOwnerThread(null); 
        }
        // 设置标识
        setState(c); 
        return free; 
    }

    // 判断资源是否被当前线程占有
    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    // 新生一个条件
    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // Methods relayed from outer class
    // 返回资源的占用线程
    final Thread getOwner() {        
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }
    // 返回状态
    final int getHoldCount() {            
        return isHeldExclusively() ? getState() : 0;
    }

    // 资源是否被占用
    final boolean isLocked() {        
        return getState() != 0;
    }

    /**
        * Reconstitutes the instance from a stream (that is, deserializes it).
        */
    // 自定义反序列化逻辑
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}  

image

NonfairSync

主要是实现了 Sync 类中抽象的 lock 方法,表示采用非公平策略获取锁。

// 非公平锁
static final class NonfairSync extends Sync {
    // 版本号
    private static final long serialVersionUID = 7316153563782823691L;

    // 获得锁
    final void lock() {
        if (compareAndSetState(0, 1)) // 比较并设置状态成功,状态0表示锁没有被占用
            // 把当前线程设置独占了锁
            setExclusiveOwnerThread(Thread.currentThread());
        else // 锁已经被占用,或者set失败
            // 以独占模式获取对象,忽略中断
            acquire(1); 
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

FairSyn

主要是实现了 Sync 类中抽象的 lock 方法,表示采用公平策略获取锁。

// 公平锁
static final class FairSync extends Sync {
    // 版本序列化
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        // 以独占模式获取对象,忽略中断
        acquire(1);
    }

    /**
        * Fair version of tryAcquire.  Don't grant access unless
        * recursive call or no waiters or is first.
        */
    // 尝试公平获取锁
    protected final boolean tryAcquire(int acquires) {
        // 获取当前线程
        final Thread current = Thread.currentThread();
        // 获取状态
        int c = getState();
        if (c == 0) { // 状态为0
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) { // 不存在已经等待更久的线程并且比较并且设置状态成功
                // 设置当前线程独占
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 状态不为0,即资源已经被线程占据
            // 下一个状态
            int nextc = c + acquires;
            if (nextc < 0) // 超过了int的表示范围
                throw new Error("Maximum lock count exceeded");
            // 设置状态
            setState(nextc);
            return true;
        }
        return false;
    }
}

ThreadLocal

ThreadLocal 是一个将在多线程中为每一个线程创建单独的变量副本的类; 当使用 ThreadLocal 来维护变量时, ThreadLocal 会为每个线程创建单独的变量副本, 避免因多线程操作共享变量而导致的数据不一致的情况。

例子:

// 定义
public class DateUtils {
    public static final ThreadLocal<DateFormat> df = new ThreadLocal<DateFormat>(){
        @Override
        protected DateFormat initialValue() {
            return new SimpleDateFormat("yyyy-MM-dd");
        }
    };
}
// 使用
DateUtils.df.get().format(new Date());

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize, //10,核心线程数,线程池中始终存活的线程数。
                          int maximumPoolSize, //30,最大线程数,线程池中允许的最大线程数,当线程池的任务队列满了之后可以创建的最大线程数。
                          long keepAliveTime, //最大线程数可以存活的时间,当线程中没有任务执行时,最大线程就会销毁一部分,最终保持核心线程数量的线程。
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, //100,一个阻塞队列,用来存储线程池等待执行的任务,均为线程安全。
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  1. 当接收到 30 个比较耗时的任务时,10 个核心线程数(corePoolSize)都在工作,剩余的 20 个去队列(workQueue)里排队。
  2. 这个线程池最多接收的任务:maximumPoolSize + workQueue。

工作机制

  1. 当线程数小于核心线程数(corePoolSize)时,创建线程。
  2. 当线程数大于等于核心线程数(corePoolSize),且任务队列(workQueue)未满时,将任务放入任务队列(workQueue)。
  3. 当线程数大于等于核心线程数(corePoolSize),且任务队列(workQueue)已满:若线程数小于最大线程数(maximumPoolSize ),创建线程;若线程数等于最大线程数(maximumPoolSize ),抛出异常,拒绝任务。

拒绝策略

  1. ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。(ThreadPoolExecutor 线程池默认策略)
  2. ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
  4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务。

源码

  • setCorePoolSize 方法

在运行期线程池使用方调用此方法设置 corePoolSize 之后,线程池会直接覆盖原来的 corePoolSize 值,并且基于当前值和原始值的比较结果采取不同的处理策略。

  1. 对于当前值小于当前工作线程数的情况,说明有多余的 worker 线程,此时会向当前 worker 线程发起中断请求以实现回收,多余的 worker 在下次的时候也会被回收;
  2. 对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的 worker 线程来执行队列任务。
  • setMaximumPoolSize 方法
  1. 首先是参数合法性校验。
  2. 然后用传递进来的值,覆盖原来的值。
  3. 判断工作线程是否是大于最大线程数,如果大于,则对空闲线程发起中断请求

我们一般使用的是 Spring 包装的类 ThreadPoolTaskExecutor。对应的源码里面也有写到 setCorePoolSize 和 setMaximumPoolSize 在运行的时候是可以修改的。

  • setQueueCapacity 方法

在 ThreadPoolTaskExecutor 方法之中,setQueueCapacity 直接对其赋值,如果也要实现和上面一样的效果,我们往下看看,发现有一个 createQueue 的方法,返回一个 BlockingQueue 的实现类,内部为 LinkedBlockingQueue 构造方法,在 LinkedBlockingQueue 源码之中,可以发现 capacity 对修饰为 final 的了。如果想要实现上述支持动态修改的模式,那么可以重写 LinkedBlockingQueue 方法,允许 capacity 为可以修改即可。然后在实现修改的时候,保证线程安全。

三种方式

  • newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}
  1. 线程池的线程数量达 corePoolSize 后,即使线程池没有可执行任务时,也不会释放线程。
  2. 线程池里的线程数量不超过 corePoolSize,这导致了 maximumPoolSize和 keepAliveTime 将会是个无用参数。
  3. 使用了无界队列, 所以 FixedThreadPool 永远不会拒绝, 即饱和策略失效。
  • newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  1. 初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行。
  2. 使用了无界队列, 所以 SingleThreadPool 永远不会拒绝, 即饱和策略失效。
  • newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}
  1. 在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务。
  2. 主线程调用 SynchronousQueue 的 offer() 方法放入任务, 倘若此时线程池中有空闲的线程尝试读取 SynchronousQueue 的任务, 即调用了SynchronousQueue 的 poll(),那么主线程将该任务交给空闲线程。
  3. 当线程池为空或者没有空闲的线程,则创建新的线程执行任务。
  4. 执行完任务的线程倘若在 60s 内仍空闲, 则会被终止,因此长时间空闲的CachedThreadPool 不会持有任何线程资源。

深入理解

为何线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式?

其中最主要的原因是:在 Executors 的方法内存在下面的风险:

  • newFixedThreadPool 和 newSingleThreadExecutor

主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至内存溢出。

  • newCachedThreadPool 和 newScheduledThreadPool

主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至内存溢出的风险。

配置线程池需要考虑的因素

从任务的优先级,任务的执行时间长短,任务的性质( CPU密集 IO密集),任务的依赖关系这四个角度分析,并且近可能地使用有界的工作队列。

例如:

  1. CPU密集型: 尽可能少的线程,Ncpu+1。
  2. IO密集型: 尽可能多的线程,Ncpu*2,比如数据库连接池。

文章作者: L Q
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 L Q !
  目录