并发 - Java并发工具 ThreadLocal与Atomic类

并发 - Java并发工具 ThreadLocal与Atomic类

最近又是折腾工作又是卖房子, 不过不管怎么说, 一切还是向着好的方向发展, 至于工作就慢慢来吧. 反正水平在, 不怕的. 这个夏天看来必定是不能安心的继续看开发了, 不过有空还是要抓紧学习. 毕竟开发这一行业, 日新月异的速度可比财务这种就是在原地打转的行业要高太多了. Java虚拟机对锁的一些策略

最近又是折腾工作又是卖房子, 不过不管怎么说, 一切还是向着好的方向发展, 至于工作就慢慢来吧. 反正水平在, 不怕的. 这个夏天看来必定是不能安心的继续看开发了, 不过有空还是要抓紧学习. 毕竟开发这一行业, 日新月异的速度可比财务这种就是在原地打转的行业要高太多了.
  1. Java虚拟机对锁的一些策略
  2. ThreadLocal
  3. 无锁操作-CAS
  4. AtomicInterger
  5. AtomicReference与AtomicStampedReference
  6. 其他原子类

Java虚拟机对锁的一些策略

想要使用好锁, 就要了解虚拟机, 现在逐渐深入了就会知道确实要了解虚拟机, 包括内存模型等, 确实很有用处. Java对更高效的使用锁, 做出了一些努力:
  1. 锁偏向, 即一个线程如果拿到锁, 就会倾向于再次拿到锁, 这个在锁竞争不激烈的程序里, 就好比之前的各个例子里, 很普遍.
  2. 轻量级锁, 这个感觉书里写的不是很明白, 还得自己再研究. 先了解一下, 争抢锁的时候会有从轻量级到普通锁的一个扩展
  3. 自旋锁, 所谓自旋就是while循环. 如果一个线程拿不到锁, 就会让其进行几个自旋, 赌一下会拿到锁. 自旋以后真的拿不到锁, 则会真实的把线程在操作系统层面挂起.
  4. 锁消除, 指的是虚拟机会在不存在并发竞争的情况下, 进行逃逸分析, 如果一个函数用到的变量全部都是本地变量, 则不会加锁.
  5. ConcurrentSkipListMap<K,V>, 这是一个特别的数据结构叫做跳表, 本质是一个多层链表, 实现的功能类似于Map, 实际中性能还不错.
另外java.util.vector是线程安全的, 不过现在基本上也没人用了.

ThreadLocal

锁的思路来源于保护共享变量. 而ThreadLocal来自于以空间来降低复杂度, 也就是经典的给10个人每人发一个篮球的笑话. 如果线程需要反复取用局部变量, 而这个局部变量在每个线程中是不同的, 那么就可以使用ThreadLocal当做容器, 注意, 存入容器的对象必须是由应用来保证是对每个线程独立的, 否则就跟不加锁一样, 取出来(就是一个指针)还是一样会造成错误. 既然是局部变量, 其实本身也就没有多线程问题了, 这个只不过是多个线程可以复用同一个ThreadLocal对象, 比较方便而已. 看一个例子:
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadLocalTest {

    private static final SimpleDateFormat sdf = new SimpleDateFormat(("yyyy-MM-dd HH:mm:ss"));

    public static class ParseDate implements Runnable {

        int i = 0;

        public ParseDate(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            try {
                Date t = sdf.parse("2020-07-07 16:42:" + i % 60);
                System.out.println(i + ":" + t);
            } catch (ParseException e) {
                e.printStackTrace();

            }
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 100; i++) {
            executorService.submit(new ParseDate(i));
        }

        executorService.shutdown();

    }

}
这段代码, 让SimpleDateFormat对象作为共享变量, 然后去调用其的parse()方法, 由于所有线程固定parse的都是2020-07-07 16:42开始的字符串, 所以可以预期解析结果会差不多. 但是实际结果很有意思:
0:Sat Oct 07 16:42:00 CST 2220
86:Sun Sep 12 08:00:26 CST 2106
97:Sat Sep 12 13:44:17 CST 2020
57:Tue Jul 07 16:07:57 CST 2020
会发现有些日期和有些时间都是不应出现的情况, 这是因为parse()方法不支持多线程. 这里的解决办法就是加锁即可. 但是如果想以空间换效率的话, 其实就不需要让线程去抢锁, 而是每个线程发一个独立的sdf对象即可, ThreadLocal就一个get和一个set方法, 放进去什么取出来什么. 修改一下代码:
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadLocalTest {

    private final static ThreadLocal<SimpleDateFormat> data = new ThreadLocal<>();

    public static class ParseDate implements Runnable {

        int i = 0;

        public ParseDate(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            try {
                if (data.get() == null) {
                    data.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
                }
                Date t = data.get().parse("2020-07-07 17:02:" + i % 60);
                System.out.println(i + ":" + t);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 100; i++) {
            executorService.submit(new ParseDate(i));
        }
        executorService.shutdown();
    }
}
这里其实是说明, 每个线程虽然都对同一个ThreadLocal对象执行操作, 但是放进去和取出来的就是彼此不同的. 其隔离是在ThreadLocal内部完成的. 当然, 如果不是新创建, 而是每个线程依然放入同一个SDF对象, 那依然会出问题. 可见如果有100个线程, 实际上ThreadLocal中就存放了100个不同的SDF对象, 对空间的占用还是相当大的, 而且创建对象也会有花费, 因此需要定期清理. 执行data.remove();就可以将ThreadLocal中对应当前线程内容的部分清空. 如果要清空整个ThreadLocal, 可以将其设置为null, 但千万要注意其他线程就不要再使用ThreadLocal对象了. ThreadLocal对于性能的帮助, 主要在于加锁解锁会比较耗性能, 线程竞争激烈的情况下, 这种时候为每个线程创建一个不怎么消耗资源的对象, 可能比抢共享变量效率更高. 如果对象用完就可以舍弃, 那么整体而言在计算过程中可能内存占用就较高, 然后内存就会恢复到计算之前的状态.

无锁操作-CAS

无锁操作就是之前在看操作系统的时候, 提到的比较并交换的原子操作. 实际上就是很多锁的内部使用的CPU指令.因为是单条指令, 所以可以确保原子性. 自旋锁是测试并交换, 比较并交换说是无锁, 其实是Java层面的无需创建锁, 在这里就可以看到使用比较并交换实现的锁. 比较并交换还会返回真实的当前值, 所以可以用真实的当前值再去比较并交换, 一直到可以更新成新的数字为止. Java的并发包中提供了一个java.util.concurrent.atomic包, 这个包实际上在内部就使用了CAS, 也就是说如果操作这个包中的对象, 可以不用加锁, 等于内部使用CAS给加上锁了. 文档可以看 Package java.util.concurrent.atomic, 其中的主要类有:
  1. AtomicBoolean
  2. AtomicInteger
  3. AtomicIntegerArray
  4. AtomicIntegerFieldUpdater<T>
  5. AtomicLong
  6. AtomicLongArray
  7. AtomicLongFieldUpdater<T>
  8. AtomicMarkableReference<V>
  9. AtomicReference<V>
  10. AtomicReferenceArray<E>
  11. AtomicReferenceFieldUpdater<T,V>
  12. AtomicStampedReference<V>
  13. DoubleAccumulator
  14. DoubleAdder
  15. LongAccumulator
  16. LongAdder
这其中的Atomic+基本类型, 就可以当成一个多线程安全的基础变量来操作. FieldUpdater则用是基于反射技术, 更新一个指定的或者自行编写的类的volatile标识的域. Reference指的是对象, 用于原子性的更新一个对象的相关内容. StampedReference则是在内部维护了一个每次更新的记录(一个整数值)与一个要更新的对象, 每次会比较这个整数值然后更新对象. 后边的累加器用于一个或多个变量共同维护一个其他的变量的情况, 具体我还没仔细了解过. 常用的就是上边的两大类.

AtomicInterger

文档看这里. 构造方法有两个, 一个没有参数, 即默认构造一个内部值为0. 如果带参数, 参数是一个int, 内部值就设置为这个int值. 主要的方法大概可以分为如下几个类型:

    采用函数式接口可以进行自定义计算

  1. getAndAccumulate(int x, IntBinaryOperator accumulatorFunction), 这个和accumulateAndGet唯一不同的是返回更新前的值.
  2. accumulateAndGet(int x, IntBinaryOperator accumulatorFunction), 这个方法使用给出的参数x和AtomicInterger的当前值, 将两个值通过IntBinaryOperator函数式接口计算完之后的结果去更新AtomicInterger, 然后返回更新后的值.
  3. getAndUpdate(IntUnaryOperator updateFunction), 获取当前值, 交给IntUnaryOperator运算后, 将结果设置为新值, 返回旧值.
  4. updateAndGet(IntUnaryOperator updateFunction), 与getAndUpdate不同的是返回新值.
  5. 普通的获取值和设置值

  6. get(), 获取当前值.
  7. getAndSet(int newValue), 设置为新值, 并返回旧值.
  8. void set(int newValue), 设置为当前值,返回值是void.
  9. void lazySet(int newValue), 最终设置为新值, 可能不会立刻设置, 注意返回值是void.
  10. 自增自减和增加/减少指定数值

  11. getAndAdd(int delta), 增加delta, 但是返回旧值. 注意get在前的都返回旧值, 操作在前的, 都返回操作后的值也就是新值.
  12. addAndGet(int delta), 向当前值增加delta, 返回更新后的值.
  13. getAndDecrement(), 减1并返回旧值.
  14. decrementAndGet(), 将当前值减1, 返回新值.
  15. getAndIncrement(), 加1并返回旧值.
  16. incrementAndGet(), 加1并返回新值.
  17. CAS方法

  18. compareAndSet(int expect, int update), 标准的CAS方法, 如果当前值与expect值相等, 则将当前值设置为update值.
  19. weakCompareAndSet(int expect, int update), 弱版本CAS.
  20. 简便取值并转换

  21. intValue(), 返回int类型的当前值.
  22. longValue(), 返回当前值转换为long类型后的结果.
  23. doubleValue(), 获取当前值并将其转换为double类型返回.
  24. floatValue(), 获取当前值并将其转换为float类型返回.
有了这个类型之后, 就可以来更改一下最经典的抢锁更新同一个共享变量的代码了:
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntergerTest {

    private static AtomicInteger integer = new AtomicInteger();

    private static int j = 0;

    public static class MyThread implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                integer.getAndAdd(1);
                j++;
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 6; i++) {
            new Thread(new MyThread()).start();
        }
        Thread.sleep(2000);

        System.out.println("AtomicInteger = " + integer.get());
        System.out.println("NormalInteger = " + j);
    }
}
这段程序运行之后, 一个结果如下:
AtomicInteger = 60000
NormalInteger = 37277
这说明同样无锁, AtomicInteger实际上用内部的CAS锁保护了共享变量. 而NormalInteger因为没有任何保护, 所以是多线程不安全的.

AtomicReference与AtomicStampedReference

基本类型可以直接使用上边的AtomicInteger及类似的类, 而AtomicReference则是用来原子性操作对象的, 文档在这里. 既然是对象, 所以可以看到这个类有泛型, 需要使用具体的对象引用. 无参构造器则内部引用初始是null, 有参的话, 内部引用就是那个对象的引用. 其中的方法就不看了, 本质还是将V作为一个数据类型来操作的, 比如使用AtomicReference<Integer>就和直接使用AtomicInteger差不多. 不过这里有一个问题就是, 内部其实是使用equals方法进行比较, 但只能比较对象的两个瞬间的状态, 无法知道对象是否曾经被修改过. 所以对于状态复杂的对象, 会使用AtomicStampedReference类. AtomicStampedReference类在内部增加一个时间戳, 也提供了与其相关的新的API, 可以在更新的时候考虑时间戳的影响. 或者也可以将时间戳理解为版本. 比如一个对象只能被更新一次, 那么在编写多线程程序的时候, 初始将时间戳设置为0, 更新完毕的时候, 就将时间戳设置为1. 然后线程更新程序中会判断时间戳, 如果是0才更新, 1就不更新. 这样就会保证对象只被更新一次. 具体的API就不放了, 有了版本控制, 就可以实现一些更复杂的功能.

其他原子类

有了基本类型, 还有基本类型的数组, 也可以无锁操作. 这个直接查阅文档即可. FieldUpdater类型则是用于更新已经编写的类, 让其多线程安全. 这个需要通过反射的方法获取类和变量名称, 而且域必须要以volatile声明才可以. 先看一个例子, 假设我们有一个计数器, 这个计数器实际上是线程不安全的, 如下:
public class UnsafeCounter {

    volatile int counter = 0;

    public int getCounter() {
        return counter;
    }

    public void setCounter(int counter) {
        this.counter = counter;
    }

    public void increment() {
        counter = counter + 1;
    }
}

这里IDE会提示volatile变量没有使用原子操作, 很显然多线程的话会出问题, 写一个操作计数器的类如下:
public class AtomicIntergerTest {
    public static void main(String[] args) throws InterruptedException {
        UnsafeCounter unsafeCounter = new UnsafeCounter();

        for (int i = 0; i < 10; i++) {
            new Thread() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        unsafeCounter.increment();
                    }
                }
            }.start();
        }

        Thread.sleep(2000);
        System.out.println("计数器是: " + unsafeCounter.getCounter() + " 应该是: 10000");
    }
}
运行结果类似于:
计数器是: 8690 应该是: 10000
现在来改造一下这个操作类, Updater通过反射获取计数器的参数, 通过Updater来操作实际的值.
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class AtomicIntergerTest {

    public static final AtomicIntegerFieldUpdater<UnsafeCounter> updater = AtomicIntegerFieldUpdater.newUpdater(UnsafeCounter.class, "counter");

    public static void main(String[] args) throws InterruptedException {
        UnsafeCounter unsafeCounter = new UnsafeCounter();

        for (int i = 0; i < 10; i++) {
            new Thread() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        updater.incrementAndGet(unsafeCounter);
                    }
                }
            }.start();
        }

        Thread.sleep(2000);

        System.out.println("计数器是: " + unsafeCounter.getCounter() + " 应该是: 10000");

    }
}
红色部分是关键, 先是调用静态方法, 传入类型和属性名称, 创建一个updater对象, 然后用这个对象的incrementAndGet方法, 参数是实际的计数器对象. 这个打印出来的结果就没有问题, 相当于给原来计数器的counter域加上了一个线程安全的包装, 这个适用于侵入性不是很强的情况下修改原来的类.
LICENSED UNDER CC BY-NC-SA 4.0
Comment