并发 - Java并发工具 续

并发 - Java并发工具 续

亲爱的女儿6岁了, 6岁的时间里, 我换了三家公司, 仔细算了算工作也就13年, 时间真是一个神奇的东西. 简单总结完了Java里边与并发相关的原语和基础知识, 现在用Java写点小玩意问题不大了, 而且由于面向对象的思路, 传递参数要比C语言底层方便一些. 继续来看看Java的并发工具包java.

亲爱的女儿6岁了, 6岁的时间里, 我换了三家公司, 仔细算了算工作也就13年, 时间真是一个神奇的东西. 简单总结完了Java里边与并发相关的原语和基础知识, 现在用Java写点小玩意问题不大了, 而且由于面向对象的思路, 传递参数要比C语言底层方便一些. 继续来看看Java的并发工具包java.util.concurrent中提供的一些工具, 就是专门用于多线程并发的类.
  1. 信号量 Semaphore
  2. 读写锁 ReentrantReadWriteLock
  3. 线程计数器 CountDownLatch
  4. 循环栅栏 CyclicBarrier
  5. 阻塞工具 LockSupport

信号量

信号量也是老朋友了, 相比之前的可重入锁和条件变量, 信号量突出的特点就是可以放超过1个的线程进入临界区. 构造函数有两个, 一个参数是信号量的int整数, 表示可以同时放多少个线程进入临界区, 第二个参数可以指定是否公平. 方法有:
  1. void acquire()
  2. void acquireUninterruptibly()
  3. boolean tryAcquire()
  4. boolean tryAcquire(long timeout, TimeUnit unit)
  5. void release()
从方法的名称就可以很明显的看出来作用, 信号量搭配条件变量就可以来实现一个生产者-消费者队列了. 不过这里暂时不写了. 先实验一下信号量:
import java.util.concurrent.Semaphore;

public class SemaphoreTest implements Runnable {

    public static volatile int i = 0;

    public static Semaphore semaphore = new Semaphore(4);

    @Override
    public void run() {
        semaphore.acquireUninterruptibly();
        for (int j = 0; j < 100000; j++) {
            i = i + 1;
        }
        System.out.println(Thread.currentThread().getName() + " 完成工作");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        semaphore.release();
    }

    public static void main(String[] args) throws InterruptedException {

        for (int i = 0; i < 20; i++) {
            Thread thread = new Thread(new SemaphoreTest());
            thread.start();
        }

        Thread.sleep(6000);

        System.out.println("i=" + i);
    }

}
这里启动了一个同事放4个线程进临界区的信号量, 然后每次先无打断等待信号量, 之后再释放信号量即可. 这里没有用互斥锁保护i, 所以可以发现, 进入了临界区之后, 读写共享变量依然需要互斥锁的保护, 可以写成如下:
synchronized (semaphore) {
    i = i + 1;
}
这样就用同一个互斥锁保护了共享变量i.

读写锁

读写锁也在底层的时候看过了, 读写锁主要用来大批量的读和少部分更新的情况. 由于读和读之间完全不需要加锁, 因为不会修改. 只有读写和写写操作之间需要相互等待和获取锁. 如果读的次数远远大于写的次数, 读写锁就可以获取最大的性能. 其底层实现也很有意思, 就是通过一个互斥变量来统计当前所有的读者, 只把读者放进来, 而写者要进来的时候要等待读者放弃锁, 写者获取共享变量锁写入, 之后再释放. 这里看看作者的例子:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockTest {

    private static Lock lock = new ReentrantLock();
    //创建一个读写锁对象, 然后调用其中的方法获取读锁和写锁
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readlock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    private int value;

    //模拟读操作, 线程睡眠一秒 然后返回value
    public int handleRead(Lock lock) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);
            return value;
        }  finally {
            lock.unlock();
        }
    }

    //模拟写操作 用index更新value
    public void handleWrite(Lock lock, int index) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);
            value = index;
        } finally {
            lock.unlock();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        final ReadWriteLockTest demo = new ReadWriteLockTest();

        //创建模拟读的线程, 使用读锁
        Runnable readRunnable = new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleRead(readlock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };


        //创建模拟写的线程, 使用写锁写入一个随机整数
        Runnable writeRunnable = new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleWrite(writeLock,new Random().nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //创建20个读者和2个写者:
        List<Thread> threads = new ArrayList<>();

        for (int i = 0; i < 20; i++) {
            Thread reader = new Thread(readRunnable);
            threads.add(reader);
            reader.start();
        }

        for (int j = 0; j < 2; j++) {
            Thread writer = new Thread(writeRunnable);
            threads.add(writer);
            writer.start();
        }

        //等待所有线程结束
        long start = System.currentTimeMillis();
        
        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("EndTime is :" + (System.currentTimeMillis() - start));

    }


}
这段程序的所有读者和所有写者都花费1秒钟进行读或者写, 一共有22个线程, 20个读者, 2个写者. 如果是普通的互斥锁, 主线程等待所有的线程完成工作之后, 大概要等待23秒左右. 但是这里使用了读写锁, 即使一个拿到读锁的读者在休眠, 其他读者仍然可以进入. 所以整体上整个程序的运行时间会大大缩短. 如果将标红的两行中使用的读写锁替换成类中静态的互斥锁lock对象, 可以同样达到保护共享变量value的作用, 但是整个程序的运行时间将大大延长: 这个程序使用读写锁的运行时间基本上在3秒左右, 而使用互斥锁的运行时间在22秒左右.

线程计数器 CountDownLatch

这个倒计时器的意思在于像一道门一样堵住当前线程的执行, 等计数器为0, 再让当前线程开始工作. 构造函数接受一个整数作为参数, 即计数器的计数个数. 看了一下简单用法:
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTest implements Runnable {
    static final CountDownLatch counter = new CountDownLatch(10);
    static final CountDownLatchTest demo = new CountDownLatchTest();


    //每个线程执行工作之后, 通知计数器减1
    @Override
    public void run() {
        try {
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println("check complete");
            counter.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    //主线程开启一个10个线程的固定线程池, 然后每个线程提交上边的任务
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i++) {
            executorService.submit(demo);
        }

        //主线程等待计数器到0
        counter.await();

        System.out.println("所有线程完成了工作");

        executorService.shutdown();
    }

}
可见这个提供了一个比join()更加好的控制线程协作的方法.

循环栅栏 CyclicBarrier

这个和上边的计数器有点类似, 但是可以循环, 于是可以每次等待一批线程完成工作, 再等待下一批. 这个有意思的是先调用一个方法就可以自动完成计数并放行.
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    public static class Worker implements Runnable {

        private String name;
        private final CyclicBarrier barrier;

        @Override
        public void run() {
            try {
                barrier.await();

                work();

                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

        public Worker(CyclicBarrier barrier, String name) {
            this.name = name;
            this.barrier = barrier;
        }

        void work() {
            System.out.println(name + "开始工作....");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + "工作结束...");
        }
    }

    public static class Commander implements Runnable {
        boolean flag;
        int N;

        public Commander(boolean flag, int n) {
            this.flag = flag;
            N = n;
        }


        @Override
        public void run() {
            if (flag) {
                System.out.println("当前一批工人全部干完活了");

            } else {
                System.out.println("当前工人集合完毕, 出发干活");
                flag = true;
            }
        }

        public static void main(String[] args) {
            final int N = 40;
            Thread[] allSoldier = new Thread[N];
            boolean flag = false;
            CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Commander(flag, N));

            System.out.println("集合队伍");

            for (int i = 0; i < N; i++) {
                System.out.println("工人" + i + "报道");
                allSoldier[i] = new Thread(new Worker(cyclicBarrier, "士兵" + i));
                allSoldier[i].start();
            }

        }

    }


}
这里的关键是两行红色, 第一次调用await()就是进行等待至计数到10, 第二次调用就是开始下一次调用, 各个线程在调用之后就会进行阻塞, 之后完成工作, 然后再调用就会进行阻塞. 而同一个CyclicBarrier对象就会在内部进行控制, 每次计数到10的时候就让一批线程进行工作, 然后再进行计数. 这里的异常还需要注意一下, 除了通用的打断异常之外, 还有一个BrokenBarrierException, 表示当前的栅栏已经破损. 一般如果一批线程已经集中在一个栅栏里, 然后有一个线程出现了异常, 剩下的线程都会得到一个BrokenBarrierException, 因为这个时候栅栏失效就很可能无法继续了.

阻塞工具 LockSupport

这个阻塞工具就是之前操作系统里提到的, 线程调用过一个停止阻塞的方法之后, 再去阻塞, 也不会阻塞而是直接继续执行. 这样就避免了线程继续执行指令和阻塞指令乱序的问题. 这个阻塞工具类LockSupport只有静态方法可用, 方法如下:
  1. park(Thread thread), 阻塞当前线程
  2. unpark(Thread thread), 让当前线程停止阻塞
这两个方法调用的前后顺序没有关系, 如果park()在先, unpark()会让线程停止阻塞. 如果unpark()在先, 则park()调用的时候不会阻塞, 而会直接往下继续运行. 这其中的机制在操作系统那里看到过, 实际上相当于给每个线程一个信号量一样的东西, park()调用会去修改那个信号量, unpark()也会. 这样无论先后, 两个函数都会知道当前的状态, 因此选择是阻塞还是继续执行. 一个简单的例子如下:
import java.util.concurrent.locks.LockSupport;

public class LockSupportTest {

    public static class MyThread implements Runnable {
        @Override
        public void run() {
            System.out.println("准备一秒钟后调用阻塞方法");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LockSupport.park();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new MyThread());

        thread.start();
        System.out.println("主线程尝试在启动副线程后就将其unpark()");
        LockSupport.unpark(thread);

        thread.join();

        System.out.println("全部线程执行完毕");

    }
}
这两个的用法其实是针对已经不太使用的thread.suspend()thread.resume()来说的, 这两个方法必须严格的前后调用, 否则会导致线程挂起但没人唤醒. 而LockSupport就无需这种担心, 主线程可以保证一定会让副线程继续运行. 将上边的两行红色部分分别换成:
Thread.currentThread().suspend();
thread.resume();
程序就会卡死, 这是因为主线程先调用了副线程resume, 之后副线程再进入suspend, 就没法被唤醒了. 使用IDEA的话会发现, 这两个方法已经标记为废弃. 今天是7月1日啦, 一年一年时间过的好快, 未来还不知道有什么等着自己, 总之目前就继续做吧.
LICENSED UNDER CC BY-NC-SA 4.0
Comment