Java毕竟还是偏工程和设计, 搞完一个基础理论, 接着就得上模式, 这里就跟着书外加自己写写来上点模式看看.
- 流水线模式
- 并行分工
流水线模式
流水线模式的核心是一个流水线, 与现实生活中一整条流水线, 各道工序同时加工不同, 线程的流水线不能这么长, 否则线程可能要等待很久, 而且也不好同步这么多线程.
书里的流水线模式是每个线程分配一个阻塞队列. 从第一个线程开始, 将业务数据扔进第一个线程的流水线. 第一个线程在启动后就会尝试从阻塞队列中读取数据, 如果读不到, 就会阻塞.
一旦读到, 就会进行处理, 并将处理完的数据扔到下一个线程的阻塞队列中. 下一个线程也一直在等待阻塞队列产生结果.
就这样一个一个线程下去, 直到最后处理完毕.
来尝试自己编写一下吧. 任务很简单, 送一个任意字符串进流水线, 然后在后边添加上"第一道工序", "第二道工序"之类的, 最后返回拼接后的字符串.
要编写的话, 每个线程类先编写好, 这就统一用内部类的方式写了. 先写一个传递数据的类:
public static class ProductMessage {
String message;
public ProductMessage(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
然后是各个工序的线程, 每个线程中弄一个自己的阻塞队列. 这是第一个线程, 为了方便投入原材料, 就直接继承了Thread类:
public static class First extends Thread {
private static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
public void offerRawMaterial(String material) throws InterruptedException {
blockingQueue.put(material);
}
@Override
public void run() {
while (true) {
try {
//使用阻塞方法take
String message = blockingQueue.take();
message = message + "第一道工序";
//使用阻塞方法put
Second.blockingQueue.put(message);
} catch (InterruptedException e) {
System.out.println("被打断, 结束线程");
break;
}
}
}
}
第二个线程就继承Runnable了, 和第一个线程几乎一样, 处理完以后向第三个线程中投放数据:
public static class Second implements Runnable {
private static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
@Override
public void run() {
while (true) {
try {
//使用阻塞方法take
String message = blockingQueue.take();
message = message + "第二道工序";
//使用阻塞方法put
Third.blockingQueue.put(message);
} catch (InterruptedException e) {
System.out.println("被打断, 结束线程");
break; }
}
}
}
第三个线程处理之后直接打印结果.
public static class Third implements Runnable {
private static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
@Override
public void run() {
while (true) {
try {
//使用阻塞方法take
String message = blockingQueue.take();
message = message + "第三道工序";
//使用阻塞方法put
System.out.println("有产品生产完成: " + message);
} catch (InterruptedException e) {
System.out.println("被打断, 结束线程");
break; }
}
}
}
这四个内部类外边套一个类, 用于启动:
public class AssemblyLine {
public static void main(String[] args) throws InterruptedException {
First first = new First();
Thread second = new Thread(new Second());
Thread third = new Thread(new Third());
//创建并启动三个线程, 一开始阻塞队列都是空, 所以三个线程都阻塞
first.start();
second.start();
third.start();
Scanner scanner = new Scanner(System.in);
//输入一行, 就向流水线中放入一个字符串
while (true) {
System.out.print("请输入字符串, 输入qqq退出: ");
String msg = scanner.nextLine();
if (msg.equals("qqq")) {
break;
}
System.out.println();
first.offerRawMaterial(msg);
}
//输入退出指令之后, 打断所有的线程, 根据之前编写的异常处理, 会直接退出
first.interrupt();
second.interrupt();
third.interrupt();
//等待所有的线程完成工作后退出
first.join();
second.join();
third.join();
System.out.println("完成工作");
}
}
实际试验了一下, 确实可以像流水线一样操作, 同时也知道了, 配合while循环和阻塞, 可以让线程被动的等待工作.
流水线模式
所谓并行分工, 就是有点像之前的Fork/Join模式, 将任务分解, 不过与其不同的是, 并行分工除了类似递归的模式, 还能用普通分工的模式, 就是每个线程负责一部分工作, 然后共同看一个结果是不是被找到.
在一些查找或者类似的工作中, 只要一个线程找到, 就可以通知其他线程或者使用一个共享变量, 然后集体结束任务并返回结果. 只要工作的各个部分不互相依赖, 就可以使用并发来操作.
来写一个例子试验一下, 并行从一个数组中进行搜索, 返回找到的值的索引.
先写最外层的类, 包含一个数组域, 以及一个表示是否找到的索引域, 这个索引需要用AtomicInteger:
public class MultiSearch {
private final List<Thread> threadList = new ArrayList<>();
//表示找到的索引
private final AtomicInteger result = new AtomicInteger(-1);
//线程的数量
private final int numberOfThreads = 6;
private final int[] array;
public MultiSearch(int[] array) {
this.array = array;
}
}
然后在其中编写内部线程类:
public class SearchThread extends Thread {
private final int startIndex;
private final int endIndex;
private int target;
//每个线程在指定的startIndex与endIndex-1之间查找target
public SearchThread(int startIndex, int endIndex, int target) {
this.startIndex = startIndex;
this.endIndex = endIndex;
this.target = target;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "开始运行From" + startIndex + " to " + endIndex);
boolean found = false;
for (int i = startIndex; i < endIndex; i++) {
//找到目标值
if (array[i] == target) {
//尝试使用CAS设置值, 只尝试一次, 然后直接结束即可
//如果成功设置了, 说明是第一个找到的线程
//如果没有成功, 说明其他线程已经先找到, 所以直接break即可
System.out.println(Thread.currentThread().getName() + "找到了 " + target + " 索引是: " + i);
result.compareAndSet(-1, i);
found = true;
break;
}
}
// 如果始终没有进过if, 运行到这里就是,一次都没找到, 直接结束了
if (!found) {
System.out.println(Thread.currentThread().getName() + "搜索结束, 没有找到");
}
}
}
线程类的CAS是核心, 如果找到就尝试CAS更新result, 如果发现已经被其他线程更新, 就直接break并退出, 说明其他线程已经找到了结果.
这里是为了打印过程, 所以加上了一些判断, 实际上found变量根本不需要, 只要找到就进行CAS, 然后直接break即可, 无需关心CAS是否成功.
最后为MultiSearch编写一个方法, 用于调度线程进行查找:
//查找方法
public int search(int target) throws InterruptedException {
//根据数组长度和线程数量, 数组长度小于1000就只创建一个线程进行查找, 没有必要创建多个线程
if (array.length <= 1000) {
Thread searchThread = new SearchThread(0, array.length - 1, target);
searchThread.start();
searchThread.join();
if (result.get() == -1) {
System.out.println("没有找到");
} else {
System.out.println("找到了, 索引是: " + result.get());
}
} else {
//长度超过1000, 分成按照步长的5个加上最后一个到末尾索引, 一共6个线程.
int step = array.length / numberOfThreads;
//创建前五个线程
for (int i = 0; i < numberOfThreads - 1; i++) {
System.out.println("索引范围是 " + i * step + "-->" + (step * (i + 1)));
Thread searchThread = new SearchThread(i * step, step * (i + 1), target);
threadList.add(searchThread);
searchThread.start();
}
//创建最后一个线程
System.out.println("索引范围是 " + step * (numberOfThreads - 1) + "-->" + array.length);
Thread searchThread = new SearchThread(step * (numberOfThreads - 1), array.length, target);
threadList.add(searchThread);
searchThread.start();
}
//等待所有线程完成工作
for (Thread t : threadList) {
t.join();
}
//返回result, 如果查找到, 这个就被更新为查找后的结果.
return result.get();
}
然后就可以实际编写一个测试来跑一下:
public class Test {
public static void main(String[] args) throws InterruptedException {
int[] array = new int[10000000];
for (int i = 0; i < 10000000; i++) {
array[i] = i;
}
MultiSearch searcher = new MultiSearch(array);
int index = searcher.search(9320420);
System.out.println("找到的索引是: "+index);
}
}
创建了一个长度是一千万的数组, 然后可以看到执行结果如下:
索引范围是 0-->1666666
索引范围是 1666666-->3333332
索引范围是 3333332-->4999998
索引范围是 4999998-->6666664
索引范围是 6666664-->8333330
索引范围是 8333330-->10000000
Thread-3开始运行From4999998 to 6666664
Thread-4开始运行From6666664 to 8333330
Thread-0开始运行From0 to 1666666
Thread-2开始运行From3333332 to 4999998
Thread-5开始运行From8333330 to 10000000
Thread-1开始运行From1666666 to 3333332
Thread-3搜索结束, 没有找到
Thread-2搜索结束, 没有找到
Thread-1搜索结束, 没有找到
Thread-4搜索结束, 没有找到
Thread-0搜索结束, 没有找到
Thread-5找到了 9320420 索引是: 9320420
找到的索引是: 9320420
6个线程在不同的区域内进行查找, 然后有一个线程找到了. 如果各个线程的查找任务属于计算密集型的话, 这个程序的理论效率是单线程的6倍.