- 资源共享和终止的例子
- 线程状态
- 打断任务
- 线程协作
- 生产者消费者模型
- 同步队列
- 管道
之前的线程都是一个运行到自己结束的任务, 但有些时候, 多线程程序的各个线程是一直在工作的, 需要得到外部的消息来是否继续运行. 同时各个线程也会有共享数据, 比如一个统计多个门进入人数的程序:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class OrnamentalGarden {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new Entrance(i));
}
//主线程睡三秒之后关闭所有门
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
executorService.shutdown();
if (!executorService.awaitTermination(250, TimeUnit.MILLISECONDS)) {
System.out.println("Some tasks are not terminated");
}
System.out.println("--------------------------------");
System.out.println("Total: " + Entrance.getTotalCount());
Entrance.showAllNumber();
}
}
// 统计人数的对象
class Count {
private int count = 0;
private Random rand = new Random();
public synchronized int increment() {
int temp = count;
if (rand.nextBoolean()) {
Thread.yield();
}
return (count = ++temp);
}
public synchronized int getCount() {
return count;
}
}
//多个门, 用于统计每个门进出的人数, 用多线程操作
class Entrance implements Runnable {
private static Random random = new Random();
private static Count count = new Count();
//创建一个列表用于存放所有的线程对象, 以控制该线程
//这和CSAPP里的使用线程号本质上一样
private static List<Entrance> entrances = new ArrayList<>();
//每一个门都从0个人开始统计
private int number = 0;
//门的id
private final int id;
//门是不是关闭
private static volatile boolean canceled = false;
//设置门关闭
public static void cancel() {
canceled = true;
}
//初始化的时候, 把新对象加入到列表中
public Entrance(int id) {
this.id = id;
entrances.add(this);
}
public void run() {
//在每个线程中, 定期的进一个人, 然后打印
while (!canceled) {
synchronized (this) {
//自己对象里的统计当前门进了多少人
++number;
}
//打印出目前的总人数
System.out.println(this + "Total: " + count.increment());
try{
TimeUnit.MILLISECONDS.sleep(random.nextInt(10) * random.nextInt(20));
} catch (InterruptedException e) {
System.out.println("Sleep interrupted");
}
}
//运行到这里门关闭了, 就打印
System.out.println("Door " + id + " stopped");
}
public synchronized int getNumber() {
return this.number;
}
@Override
public synchronized String toString() {
return "Entrance{" +
"id=" + id +
", number=" + number +
'}';
}
public static int getTotalCount() {
return count.getCount();
}
public static int sumEntrances() {
int sum = 0;
for (Entrance entrance : entrances) {
sum += entrance.getNumber();
}
return sum;
}
public static void showAllNumber() {
for (Entrance entrance : entrances) {
System.out.println(entrance);
}
}
}
这个程序是一个各个线程保存自己的数据并且协同主线程工作之外, 关键是了解如何来控制线程工作. 所有的线程对象共享一个变量 private static volatile boolean canceled = false; 用于控制门是否关闭.
由于基本类型赋值是原子操作, 也声明了volatile, 所以这里可以不同步, 当然为了不迷惑, 也可以加上同步.
这里的关键是红字的部分, 即利用了线程管理器的监听线程结束的情况. 在设置了门关闭之后, 不应该有继续存活的线程.
executorService.awaitTermination(250, TimeUnit.MILLISECONDS)
这个方法用于监听线程, 参数是时间, 在指定的时间之前, 所有的线程都被关闭, 返回true, 否则返回false.
这里利用的小技巧就是虽然所有的线程都退出了, 看上去已经不存在了, 实际上在类里边维护了一个静态变量, 用于存放所有的线程对象, 所以executorService.awaitTermination(250, TimeUnit.MILLISECONDS)
可以正常工作.
线程状态
一个线程处于四种状态之一
- 新建, 只在新创建的时候, 会短暂的处于这个状态. 线程下一步将会变成可运行或者阻塞
- 就绪, 可以运行, 也可以不运行, 只要分配了时间片就可以运行
- 阻塞, 调度器会忽略这个线程, 直到其从阻塞进入就绪状态
- 死亡, 不可再调度, 也不会再得到CPU时间, 任务结束通常是run()方法返回, 但也可以被其他的方式结束
进入阻塞有如下几种情况:
- 自己调用sleep()
- 调用wait()挂起线程, 线程会等待notify()或者notifyAll()后进入就绪状态
- 在I/O上阻塞
- 等待获取锁
现在我们要来看, 如何让线程在run()返回之前结束其任务, 这样才能让线程协同工作
打断任务 - 这里的打断指的是让任务结束阻塞状态, 不是强行中止任务.
本身Thread类就有.interrupt()方法, 持有线程对象就可以调用这个方法强行中断. 如果不具体操作每个对象, 也可以通过Executor来执行所有操作.
如果调用 Executor.shutdownNow(), 会发送一个interrupt()给它启动的所有线程.
如果只希望终止一个, 可以使用submit(), 会返回一个Future对象, 在Future对象上调用.cancel()就可以中断特定的任务.
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class Interrupting {
private static ExecutorService executorService = Executors.newCachedThreadPool();
static void test(Runnable r) throws InterruptedException {
Future<?> f = executorService.submit(r);
TimeUnit.MILLISECONDS.sleep(1);
System.out.println("Interrupting " + r.getClass().getName());
//传入true表示打断这个线程
f.cancel(true);
System.out.println("Interrupting sent to " + r.getClass().getName());
}
public static void main(String[] args) throws InterruptedException {
test(new SleepBlocked());
test(new IOBlocked(System.in));
executorService.shutdown();
}
}
class SleepBlocked implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
System.out.println("Interrupted");
System.out.println("Intterupted 之后执行的代码");
}
System.out.println("正常退出run()");
}
}
class IOBlocked implements Runnable {
private InputStream inputStream;
public IOBlocked(InputStream inputStream) {
this.inputStream = inputStream;
}
public void run() {
try {
System.out.println("Waiting for input: ");
inputStream.read();
} catch (IOException e) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted from blockedI/O");
} else {
throw new RuntimeException(e);
}
}
System.out.println("Exiting IOBlocked.run()");
}
}
class SynchroizedBlocked implements Runnable {
public synchronized void f() {
while (true) {
Thread.yield();
}
}
public SynchroizedBlocked() {
new Thread() {
public void run() {
f();
}
}.start();
}
public void run() {
System.out.println("Trying to call f()");
f();
System.out.println("Exiting SynchroizedBlocked.run()");
}
}
实际上无法打断阻塞在I/O的线程, 因为即使打断了, 还是在等待I/O.
线程协作
之前访问共享变量, 是让线程可以安全的操作同一个内容, 现在线程还可以互相协作指的是等待, 通知, 唤醒等.
可以把每一个线程当成一个微型的程序, 这些程序之间可以互相进行通信, 叫醒和睡眠等.
本质上就是采取一种信号的方式, 让线程之间可以互相响应来做事情.
先要看的是wait()和notifyAll().
wait()函数在调用完毕之后, 调用这个函数的线程是无法自己再执行任务的, 必须要等待其他线程将其激活. 等待的信号就是notifyAll().
sleep()和yield()都不会释放锁, 只会很大概率让调度器执行其他线程. 而 wait() 会释放锁, 这是一个很关键的特性.
.wait()可以接受毫秒数作为参数, 表示挂起的时间, 作用和sleep()有些类似(但是会释放锁). sleep()需要打断, 而.wait()是等待notifyAll()消息
wait()如果不加参数, 就是无限等待下去, 直到接收到notify()或者notifyAll()消息.
还要注意的是, wait(), notify(), notifyAll()都是Object的方法之一, 而不是Thread的方法. 而且必须在同步方法内部才能调用这三个方法, 也即调用者三个方法的线程已经进入到了同步方法内部, 也即已经获取了锁.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class WaxOnMatic {
public static void main(String[] args) throws InterruptedException {
Car car = new Car();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new WaxOn(car));
executorService.submit(new WaxOff(car));
TimeUnit.SECONDS.sleep(10);
//前边学的, 给所有线程发送interrupt信号, 我们靠这个机制结束线程
executorService.shutdownNow();
}
}
class Car {
//有了四个方法之后, 打蜡的任务需要使用两个方法, 即打完腊通知其他线程干活, 以及检查打蜡标记, 如果已经打完腊, 就继续等待
//抛光的任务则需要使用抛完光再通知其他线程干活, 以及检查抛光标记, 已经抛光过就一直等待
//打蜡标记同时也是抛光标记, 为false的是表示抛光完成, 为true的时候表示打完腊
private boolean waxOn = false;
public synchronized void waxed() {
//干完打蜡就通知所有线程干活
waxOn = true;
notifyAll();
}
public synchronized void buffed() {
//干完抛光就通知其他线程
waxOn = false;
notifyAll();
}
public synchronized void waitForWaxing() throws InterruptedException {
//如果没打蜡就继续等待
while (!waxOn) {
wait();
}
}
public synchronized void waitForBuffed() throws InterruptedException {
//如果没抛光就继续等待
while (waxOn) {
wait();
}
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car car) {
this.car = car;
}
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println("开始打蜡.");
TimeUnit.MILLISECONDS.sleep(500);
//打蜡
car.waxed();
//等抛光
car.waitForBuffed();
}
//用了try-catch, 打断任务实际上变成了终止线程的工具
} catch (InterruptedException e) {
System.out.println("打蜡任务被中断 Interrupted");
}
System.out.println("结束打蜡任务");
}
}
class WaxOff implements Runnable {
private Car car;
WaxOff(Car car) {
this.car = car;
}
public void run() {
try {
while (!Thread.interrupted()) {
//注意这里的顺序, 先等待, 再执行, 多个任务的话, 要有一个先执行并且改变标记, 其他的任务等待标记改变之后再来
car.waitForWaxing();
System.out.println("开始抛光");
TimeUnit.MILLISECONDS.sleep(300);
car.buffed();
}
} catch (InterruptedException e) {
System.out.println("抛光任务被中断 Interrupted");
}
System.out.println("结束抛光任务");
}
}
这个例子有如果设置多个打蜡和抛光的线程, 也是可以工作的, 每次只有拿到锁的线程进行工作, 然后再通知其他线程来工作. 其实很像CSAPP上的生产者和消费者模型.
等待的线程一般都需要使用检查标记和while循环来不断等待, 这也是多线程的套路了, 反正阻塞了不会占用太多资源.
只是要注意并不像底层的信号量使用那么直接, 而是要在共享变量或者说要操作的对象上设置同步方法,然后在其中设置wait()和notify().
还有要注意的就是, 这里都使用了同一个Car对象, 而Car对象中的 sync 的锁就是car对象本身, notify()和notifyAll()只能唤醒等待同一个锁的对象, 而不是全部的对象. 当然这一点也是不言自明的.
生产者消费者模型
终于到了Java 中的生产者消费者模型, 在CSAPP底层中, 使用了一个数组和三个信号量来判断, 其中一个信号量是用于同步的互斥信号量, 另外两个信号量则用作有多少个数据和空槽可用的标记.
来看看Java 的生产者和消费者模型:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
//餐馆实际上是Meal的容器, 两个线程使用同一个餐馆对象, 其中持有Meal对象, 对于容器访问, 需要加锁才行
public class Restaurant {
Meal meal = null;
ExecutorService exec = Executors.newCachedThreadPool();
//这个很有意思, 直接以当前对象传给Consumer
final Consumer consumer = new Consumer(this);
final Chef chef = new Chef(this);
public Restaurant() {
exec.execute(chef);
exec.execute(consumer);
}
public static void main(String[] args) {
new Restaurant();
}
}
//这个类就是要消费的数据.
class Meal {
private final int number;
public Meal(int number) {
this.number = number;
}
public int getNumber() {
return number;
}
@Override
public String toString() {
return "Meal{" +
"number=" + number +
'}';
}
}
//消费者, 就是把肉拿走的人
class Consumer implements Runnable {
private Restaurant restaurant;
public Consumer(Restaurant restaurant) {
this.restaurant = restaurant;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
//餐馆里没有肉, 就等待
while (restaurant.meal == null) {
wait();
}
}
System.out.println("Consumer got a meal");
//消费完肉之后, 通知厨子做饭
synchronized (restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll();
}
}
} catch (InterruptedException e) {
System.out.println("中断消费者");
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant restaurant) {
this.restaurant = restaurant;
}
public void run(){
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal != null) {
wait();
}
}
if (++count == 10) {
System.out.println("生产完了10顿饭.");
restaurant.exec.shutdownNow();
}
System.out.println("开始做饭");
synchronized (restaurant.consumer) {
restaurant.meal = new Meal(count);
restaurant.consumer.notifyAll();
}
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
System.out.println("中断厨子");
}
}
}
这个模型是一个线程的消费者和一个线程的生产者, 餐馆是数据对象Meal的持有者.
这里由于是两个单独的线程, 所以在生产和消费数据, 也就是读写meal的时候, 用自己加锁, 而在唤醒另一个生产者/消费者的时候, 用对应的对象加锁.
如果meal是一个容器, 还可以全部都对meal进行加锁, 在其中设置一些变量用来取数据.可以很容易的就把CSAPP的模型移植到Java中来, 只要对象监视器都是同一个就可以了.
然后自己根据练习24的要求, 写了个内部是定长数组的缓冲区队列持有数据, 然后有多个消费者和生产者对象的模型:
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 解决带有缓冲区的单个生产者和消费者的问题
* 初步想法是弄带有ID标识的数据对象
* 然后弄一个缓冲区对象Buffer持有数据对象
* Buffer中的放入和取出数据需要同步, 然后用内部的指针来保存当前的状态
*/
//
public class Ex24 {
public static void main(String[] args) throws InterruptedException {
Buffer buffer = Buffer.getBuffer(20);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 1; i++) {
executorService.execute(new Producer(buffer));
}
for (int j = 0; j < 3; j++) {
executorService.execute(new Consumer(buffer));
}
TimeUnit.SECONDS.sleep(3);
executorService.shutdownNow();
System.out.println(buffer);
}
}
//这个作为数据对象
class Data {
// public static synchronized Data getData() {
// return new Data(anInt++);
// }
//
private int number;
//
// private static volatile int anInt = 0;
public Data(int i) {
this.number = i;
}
@Override
public String toString() {
return "Data{" +
"number=" + number +
'}';
}
}
//Buffer类作为数据对象的持有者
//有一个取出数据的方法, 一个放入数据的方法, 用于同步
//还需要有可以取出几个和可以放入几个的标志
class Buffer {
//用工厂模式
public static Buffer getBuffer(int i) {
if (i <= 0) {
throw new RuntimeException("长度必须大于0");
}
return new Buffer(i);
}
private Data[] datas;
//用两个索引保存上次放入数据开始的地方, 和放完数据后的位置
private int lastIndex = 0;
private int currentIndex = 0;
//缓冲区长度
private int length;
//一开始有几个空槽可以放入数据, 设置为和长度一样
private int availableToPut;
//一开始有几个空槽可以取出数据, 设置为0
private int avaiableToGet = 0;
//私有构造器, 用工厂模式创建新的Buffer
private Buffer(int i) {
this.length = i;
this.availableToPut = length;
//初始化一个Data对象的数组, 全部初始化为null
datas = new Data[i];
for (int j = 0; j < length; j++) {
datas[j] = null;
}
}
//放入数据
public synchronized void putData(Data data) throws InterruptedException {
//如果可以放入数据的数量为0,就阻塞
while (availableToPut == 0) {
wait();
}
//如果成功通过阻塞, 说明可以放入数据了, 就放入数据, 同时更新相关指标
//更新currentIndex到最新的可以放入数据的位置
datas[currentIndex] = data;
currentIndex++;
currentIndex = currentIndex % length;
//更新可用槽位和可读槽位
availableToPut--;
avaiableToGet++;
//当前线程取出数据后唤醒其他放入数据,也就是生产者线程
notifyAll();
// System.out.println("成功放入了数据, 当前状态是: " + this);
}
public synchronized Data getData() throws InterruptedException {
while (avaiableToGet == 0) {
wait();
}
//从上次放入数据的地方取数据, 然后更新lastindex
Data result = datas[lastIndex];
lastIndex++;
lastIndex = lastIndex % length;
availableToPut++;
avaiableToGet--;
//当前线程取出数据后唤醒其他写入数据, 也就是消费者线程
notifyAll();
// System.out.println("成功取出了数据, 当前状态是: " + this);
return result;
}
@Override
public String toString() {
return "Buffer{" +
"datas=" + Arrays.toString(datas) +
", lastIndex=" + lastIndex +
", currentIndex=" + currentIndex +
", length=" + length +
", availableToPut=" + availableToPut +
", avaiableToGet=" + avaiableToGet +
", 尚未取出的数据等于=" + (currentIndex >= lastIndex ? currentIndex - lastIndex : currentIndex - lastIndex + length) +
'}';
}
}
class Producer implements Runnable {
private static volatile int number = 0;
private static Random random = new Random();
private final Buffer buffer;
Producer(Buffer buffer) {
this.buffer = buffer;
}
public static synchronized int getNumber() {
int temp = number;
number++;
return temp;
}
@Override
public void run() {
try {
while (true) {
Data newData = new Data(Producer.getNumber());
TimeUnit.MILLISECONDS.sleep(random.nextInt(30) * random.nextInt(20));
System.out.println("生产者尝试放入新数据:" + newData);
buffer.putData(newData);
}
} catch (InterruptedException e) {
System.out.println("生产者被中断了");
}
}
}
class Consumer implements Runnable {
private static Random random = new Random();
private final Buffer buffer;
Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(random.nextInt(30) * random.nextInt(20));
Data data = buffer.getData();
System.out.println("消费者取出了数据: " + data);
}
} catch (InterruptedException e) {
System.out.println("消费者被中断了");
}
}
}
同步队列
在自己编写的模型中, 实际上同步出现在对缓冲区的读写中, 如果有一个缓冲区已经实现了这种同步方法, 那么所有的线程之间操作缓冲区就可以了, 无需自行编写同步代码.
java.util.BlockingQueue提供了这个接口, 还有很多具体实现, 常用的有LinkedBlockingQueue, ArrayBlockingQueue.
这些同步队列都支持泛型, 平常使用的方法主要就是 .take() 返回放入的泛型类型对象, .put() 放入一个泛型类型的对象
这个其实就是缓冲区底层使用不同的数据结构, 外部封装好同步读写功能即可. 本质上和上边自行编写的Buffer类是一样的.
管道
在之前的IO里留下了Piped类型的输入和输出没有用到, 实际上管道就是一个阻塞队列, PipedWriter允许任务向管道写, PipedReader允许不同任务从同一个管道读取. 可以看成是一个生产者-消费者模型的封装好的解决方案.
一个简单的例子如下, sender 每半秒钟发一个字符到管道里, 另外一端的receiver进行输出, 核心是其中的创建管道的语句:
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PipedIO {
public static void main(String[] args) throws IOException, InterruptedException {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(sender);
executorService.execute(receiver);
TimeUnit.SECONDS.sleep(4);
executorService.shutdownNow();
}
}
class Sender implements Runnable {
private Random random = new Random(47);
//要向管道输出的任务创建一个PipedWriter()对象
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() {
return out;
}
public void run() {
try {
while (true) {
for (char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(500);
}
}
} catch (IOException | InterruptedException e) {
System.out.println("IO错误或者被打断");
}
}
}
class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
//需要从管道中读取数据的任务需要以一个PipedWriter对象作为PipedReader对象的构造器参数
//获取之后相当于写入任务的out是管道的写入端, 这边是读取端
//
in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while (true) {
//这里直接调用read()方法即可, 如果读完了, 会阻塞.
char c = (char) in.read();
System.out.println(c);
}
} catch (IOException e) {
System.out.println("被打断");
}
}
}
如果要多线程读写管道, 注意一个PipedReader只能针对管道只能创建一次, 但是PipedReader对象可以复用, 如果要用多个线程去读管道, 需要修改成这样:
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PipedIO {
public static void main(String[] args) throws IOException, InterruptedException {
Sender sender = new Sender();
//创建一个共用的PipedReader对象
PipedReader in = new PipedReader(sender.getPipedWriter());
//让每个线程都使用这个共用对象即可
Receiver receiver0 = new Receiver(sender,0, in);
Receiver receiver1 = new Receiver(sender,1, in);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(sender);
executorService.execute(receiver0);
executorService.execute(receiver1);
TimeUnit.SECONDS.sleep(4);
executorService.shutdownNow();
}
}
//Sender代码不变, 省略, 这里要调整一下Receiver的构造器
class Receiver implements Runnable {
private PipedReader in;
private int number;
public Receiver(Sender sender, int i, PipedReader in) throws IOException {
this.in = in;
this.number = i;
//这个是关键, PipedReader以一个PipedWriter对象作为构造器参数, 获取一个Reader对象
// in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while (true) {
//这里直接调用read()方法即可, 如果读完了, 会阻塞.
char c = (char) in.read();
System.out.println("ID: " + number + " " + c);
}
} catch (IOException e) {
System.out.println("被打断");
}
}
}
管道是没有sleep()和wait()方法可供调用的, 只需要调用.read(), 如果没数据就会自动阻塞.