亲爱的女儿6岁了, 6岁的时间里, 我换了三家公司, 仔细算了算工作也就13年, 时间真是一个神奇的东西.
简单总结完了Java里边与并发相关的原语和基础知识, 现在用Java写点小玩意问题不大了, 而且由于面向对象的思路, 传递参数要比C语言底层方便一些.
继续来看看Java的并发工具包java.util.concurrent
中提供的一些工具, 就是专门用于多线程并发的类.
- 信号量 Semaphore
- 读写锁 ReentrantReadWriteLock
- 线程计数器 CountDownLatch
- 循环栅栏 CyclicBarrier
- 阻塞工具 LockSupport
信号量
信号量也是老朋友了, 相比之前的可重入锁和条件变量, 信号量突出的特点就是可以放超过1个的线程进入临界区.
构造函数有两个, 一个参数是信号量的int整数, 表示可以同时放多少个线程进入临界区, 第二个参数可以指定是否公平.
方法有:
void acquire()
void acquireUninterruptibly()
boolean tryAcquire()
boolean tryAcquire(long timeout, TimeUnit unit)
void release()
从方法的名称就可以很明显的看出来作用, 信号量搭配条件变量就可以来实现一个生产者-消费者队列了. 不过这里暂时不写了. 先实验一下信号量:
import java.util.concurrent.Semaphore;
public class SemaphoreTest implements Runnable {
public static volatile int i = 0;
public static Semaphore semaphore = new Semaphore(4);
@Override
public void run() {
semaphore.acquireUninterruptibly();
for (int j = 0; j < 100000; j++) {
i = i + 1;
}
System.out.println(Thread.currentThread().getName() + " 完成工作");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.release();
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 20; i++) {
Thread thread = new Thread(new SemaphoreTest());
thread.start();
}
Thread.sleep(6000);
System.out.println("i=" + i);
}
}
这里启动了一个同事放4个线程进临界区的信号量, 然后每次先无打断等待信号量, 之后再释放信号量即可.
这里没有用互斥锁保护i, 所以可以发现, 进入了临界区之后, 读写共享变量依然需要互斥锁的保护, 可以写成如下:
synchronized (semaphore) {
i = i + 1;
}
这样就用同一个互斥锁保护了共享变量i.
读写锁
读写锁也在底层的时候看过了, 读写锁主要用来大批量的读和少部分更新的情况.
由于读和读之间完全不需要加锁, 因为不会修改. 只有读写和写写操作之间需要相互等待和获取锁. 如果读的次数远远大于写的次数, 读写锁就可以获取最大的性能.
其底层实现也很有意思, 就是通过一个互斥变量来统计当前所有的读者, 只把读者放进来, 而写者要进来的时候要等待读者放弃锁, 写者获取共享变量锁写入, 之后再释放.
这里看看作者的例子:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockTest {
private static Lock lock = new ReentrantLock();
//创建一个读写锁对象, 然后调用其中的方法获取读锁和写锁
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readlock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
private int value;
//模拟读操作, 线程睡眠一秒 然后返回value
public int handleRead(Lock lock) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
return value;
} finally {
lock.unlock();
}
}
//模拟写操作 用index更新value
public void handleWrite(Lock lock, int index) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
value = index;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
final ReadWriteLockTest demo = new ReadWriteLockTest();
//创建模拟读的线程, 使用读锁
Runnable readRunnable = new Runnable() {
@Override
public void run() {
try {
demo.handleRead(readlock);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//创建模拟写的线程, 使用写锁写入一个随机整数
Runnable writeRunnable = new Runnable() {
@Override
public void run() {
try {
demo.handleWrite(writeLock,new Random().nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//创建20个读者和2个写者:
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Thread reader = new Thread(readRunnable);
threads.add(reader);
reader.start();
}
for (int j = 0; j < 2; j++) {
Thread writer = new Thread(writeRunnable);
threads.add(writer);
writer.start();
}
//等待所有线程结束
long start = System.currentTimeMillis();
for (Thread thread : threads) {
thread.join();
}
System.out.println("EndTime is :" + (System.currentTimeMillis() - start));
}
}
这段程序的所有读者和所有写者都花费1秒钟进行读或者写, 一共有22个线程, 20个读者, 2个写者.
如果是普通的互斥锁, 主线程等待所有的线程完成工作之后, 大概要等待23秒左右. 但是这里使用了读写锁, 即使一个拿到读锁的读者在休眠, 其他读者仍然可以进入.
所以整体上整个程序的运行时间会大大缩短. 如果将标红的两行中使用的读写锁替换成类中静态的互斥锁lock对象, 可以同样达到保护共享变量value的作用, 但是整个程序的运行时间将大大延长:
这个程序使用读写锁的运行时间基本上在3秒左右, 而使用互斥锁的运行时间在22秒左右.
线程计数器 CountDownLatch
这个倒计时器的意思在于像一道门一样堵住当前线程的执行, 等计数器为0, 再让当前线程开始工作. 构造函数接受一个整数作为参数, 即计数器的计数个数.
看了一下简单用法:
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest implements Runnable {
static final CountDownLatch counter = new CountDownLatch(10);
static final CountDownLatchTest demo = new CountDownLatchTest();
//每个线程执行工作之后, 通知计数器减1
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(10) * 1000);
System.out.println("check complete");
counter.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//主线程开启一个10个线程的固定线程池, 然后每个线程提交上边的任务
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.submit(demo);
}
//主线程等待计数器到0
counter.await();
System.out.println("所有线程完成了工作");
executorService.shutdown();
}
}
可见这个提供了一个比join()更加好的控制线程协作的方法.
循环栅栏 CyclicBarrier
这个和上边的计数器有点类似, 但是可以循环, 于是可以每次等待一批线程完成工作, 再等待下一批. 这个有意思的是先调用一个方法就可以自动完成计数并放行.
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static class Worker implements Runnable {
private String name;
private final CyclicBarrier barrier;
@Override
public void run() {
try {
barrier.await();
work();
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
public Worker(CyclicBarrier barrier, String name) {
this.name = name;
this.barrier = barrier;
}
void work() {
System.out.println(name + "开始工作....");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + "工作结束...");
}
}
public static class Commander implements Runnable {
boolean flag;
int N;
public Commander(boolean flag, int n) {
this.flag = flag;
N = n;
}
@Override
public void run() {
if (flag) {
System.out.println("当前一批工人全部干完活了");
} else {
System.out.println("当前工人集合完毕, 出发干活");
flag = true;
}
}
public static void main(String[] args) {
final int N = 40;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Commander(flag, N));
System.out.println("集合队伍");
for (int i = 0; i < N; i++) {
System.out.println("工人" + i + "报道");
allSoldier[i] = new Thread(new Worker(cyclicBarrier, "士兵" + i));
allSoldier[i].start();
}
}
}
}
这里的关键是两行红色, 第一次调用await()就是进行等待至计数到10, 第二次调用就是开始下一次调用, 各个线程在调用之后就会进行阻塞, 之后完成工作, 然后再调用就会进行阻塞.
而同一个CyclicBarrier对象就会在内部进行控制, 每次计数到10的时候就让一批线程进行工作, 然后再进行计数.
这里的异常还需要注意一下, 除了通用的打断异常之外, 还有一个BrokenBarrierException
, 表示当前的栅栏已经破损. 一般如果一批线程已经集中在一个栅栏里, 然后有一个线程出现了异常, 剩下的线程都会得到一个BrokenBarrierException, 因为这个时候栅栏失效就很可能无法继续了.
阻塞工具 LockSupport
这个阻塞工具就是之前操作系统里提到的, 线程调用过一个停止阻塞的方法之后, 再去阻塞, 也不会阻塞而是直接继续执行. 这样就避免了线程继续执行指令和阻塞指令乱序的问题.
这个阻塞工具类LockSupport只有静态方法可用, 方法如下:
park(Thread thread)
, 阻塞当前线程
unpark(Thread thread)
, 让当前线程停止阻塞
这两个方法调用的前后顺序没有关系, 如果park()在先, unpark()会让线程停止阻塞. 如果unpark()在先, 则park()调用的时候不会阻塞, 而会直接往下继续运行.
这其中的机制在操作系统那里看到过, 实际上相当于给每个线程一个信号量一样的东西, park()调用会去修改那个信号量, unpark()也会. 这样无论先后, 两个函数都会知道当前的状态, 因此选择是阻塞还是继续执行.
一个简单的例子如下:
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
public static class MyThread implements Runnable {
@Override
public void run() {
System.out.println("准备一秒钟后调用阻塞方法");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.park();
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread());
thread.start();
System.out.println("主线程尝试在启动副线程后就将其unpark()");
LockSupport.unpark(thread);
thread.join();
System.out.println("全部线程执行完毕");
}
}
这两个的用法其实是针对已经不太使用的thread.suspend()
和thread.resume()
来说的, 这两个方法必须严格的前后调用, 否则会导致线程挂起但没人唤醒.
而LockSupport就无需这种担心, 主线程可以保证一定会让副线程继续运行. 将上边的两行红色部分分别换成:
Thread.currentThread().suspend();
thread.resume();
程序就会卡死, 这是因为主线程先调用了副线程resume, 之后副线程再进入suspend, 就没法被唤醒了. 使用IDEA的话会发现, 这两个方法已经标记为废弃.
今天是7月1日啦, 一年一年时间过的好快, 未来还不知道有什么等着自己, 总之目前就继续做吧.