Java Reinforcement 09 并发 - 线程协作

Java Reinforcement 09 并发 - 线程协作

资源共享和终止的例子 线程状态 打断任务 线程协作 生产者消费者模型 同步队列 管道 之前的线程都是一个运行到自己结束的任务, 但有些时候, 多线程程序的各个线程是一直在工作的, 需要得到外部的消息来是否继续运行. 同时各个线程也会有共享数据, 比如一个统计多个门进入人数的程序: import ja

  1. 资源共享和终止的例子
  2. 线程状态
  3. 打断任务
  4. 线程协作
  5. 生产者消费者模型
  6. 同步队列
  7. 管道

之前的线程都是一个运行到自己结束的任务, 但有些时候, 多线程程序的各个线程是一直在工作的, 需要得到外部的消息来是否继续运行. 同时各个线程也会有共享数据, 比如一个统计多个门进入人数的程序:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class OrnamentalGarden {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            executorService.execute(new Entrance(i));
        }

        //主线程睡三秒之后关闭所有门
        TimeUnit.SECONDS.sleep(3);
        Entrance.cancel();
        executorService.shutdown();

        if (!executorService.awaitTermination(250, TimeUnit.MILLISECONDS)) {
            System.out.println("Some tasks are not terminated");
        }
        System.out.println("--------------------------------");
        System.out.println("Total: " + Entrance.getTotalCount());
        Entrance.showAllNumber();
    }


}

// 统计人数的对象
class Count {
    private int count = 0;

    private Random rand = new Random();

    public synchronized int increment() {
        int temp = count;
        if (rand.nextBoolean()) {
            Thread.yield();

        }
        return (count = ++temp);
    }

    public synchronized int getCount() {
        return count;
    }
}

//多个门, 用于统计每个门进出的人数, 用多线程操作
class Entrance implements Runnable {

    private static Random random = new Random();

    private static Count count = new Count();

    //创建一个列表用于存放所有的线程对象, 以控制该线程
    //这和CSAPP里的使用线程号本质上一样
    private static List<Entrance> entrances = new ArrayList<>();

    //每一个门都从0个人开始统计
    private int number = 0;
    //门的id
    private final int id;
    //门是不是关闭
    private static volatile boolean canceled = false;
    //设置门关闭
    public static void cancel() {
        canceled = true;
    }

    //初始化的时候, 把新对象加入到列表中
    public Entrance(int id) {
        this.id = id;
        entrances.add(this);
    }

    public void run() {
        //在每个线程中, 定期的进一个人, 然后打印
        while (!canceled) {
            synchronized (this) {
                //自己对象里的统计当前门进了多少人
                ++number;
            }
            //打印出目前的总人数
            System.out.println(this + "Total: " + count.increment());

            try{
                TimeUnit.MILLISECONDS.sleep(random.nextInt(10) * random.nextInt(20));

            } catch (InterruptedException e) {
                System.out.println("Sleep interrupted");
            }
        }
        //运行到这里门关闭了, 就打印
        System.out.println("Door " + id + " stopped");
    }

    public synchronized int getNumber() {
        return this.number;
    }

    @Override
    public synchronized String toString() {
        return "Entrance{" +
                "id=" + id +
                ", number=" + number +
                '}';
    }

    public static int getTotalCount() {
        return count.getCount();
    }

    public static int sumEntrances() {
        int sum = 0;
        for (Entrance entrance : entrances) {
            sum += entrance.getNumber();
        }
        return sum;
    }

    public static void showAllNumber() {
        for (Entrance entrance : entrances) {
            System.out.println(entrance);
        }
    }

}
这个程序是一个各个线程保存自己的数据并且协同主线程工作之外, 关键是了解如何来控制线程工作. 所有的线程对象共享一个变量 private static volatile boolean canceled = false; 用于控制门是否关闭. 由于基本类型赋值是原子操作, 也声明了volatile, 所以这里可以不同步, 当然为了不迷惑, 也可以加上同步. 这里的关键是红字的部分, 即利用了线程管理器的监听线程结束的情况. 在设置了门关闭之后, 不应该有继续存活的线程. executorService.awaitTermination(250, TimeUnit.MILLISECONDS)这个方法用于监听线程, 参数是时间, 在指定的时间之前, 所有的线程都被关闭, 返回true, 否则返回false. 这里利用的小技巧就是虽然所有的线程都退出了, 看上去已经不存在了, 实际上在类里边维护了一个静态变量, 用于存放所有的线程对象, 所以executorService.awaitTermination(250, TimeUnit.MILLISECONDS)可以正常工作.

线程状态

一个线程处于四种状态之一
  1. 新建, 只在新创建的时候, 会短暂的处于这个状态. 线程下一步将会变成可运行或者阻塞
  2. 就绪, 可以运行, 也可以不运行, 只要分配了时间片就可以运行
  3. 阻塞, 调度器会忽略这个线程, 直到其从阻塞进入就绪状态
  4. 死亡, 不可再调度, 也不会再得到CPU时间, 任务结束通常是run()方法返回, 但也可以被其他的方式结束
进入阻塞有如下几种情况:
  1. 自己调用sleep()
  2. 调用wait()挂起线程, 线程会等待notify()或者notifyAll()后进入就绪状态
  3. 在I/O上阻塞
  4. 等待获取锁
现在我们要来看, 如何让线程在run()返回之前结束其任务, 这样才能让线程协同工作

打断任务 - 这里的打断指的是让任务结束阻塞状态, 不是强行中止任务.

本身Thread类就有.interrupt()方法, 持有线程对象就可以调用这个方法强行中断. 如果不具体操作每个对象, 也可以通过Executor来执行所有操作. 如果调用 Executor.shutdownNow(), 会发送一个interrupt()给它启动的所有线程. 如果只希望终止一个, 可以使用submit(), 会返回一个Future对象, 在Future对象上调用.cancel()就可以中断特定的任务.
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class Interrupting  {

    private static ExecutorService executorService = Executors.newCachedThreadPool();

    static void test(Runnable r) throws InterruptedException {
        Future<?> f = executorService.submit(r);
        TimeUnit.MILLISECONDS.sleep(1);
        System.out.println("Interrupting " + r.getClass().getName());
        //传入true表示打断这个线程
        f.cancel(true);
        System.out.println("Interrupting sent to " + r.getClass().getName());
    }

    public static void main(String[] args) throws InterruptedException {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));

        executorService.shutdown();
    }


}

class SleepBlocked implements Runnable {

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(100);

        } catch (InterruptedException e) {
            System.out.println("Interrupted");
            System.out.println("Intterupted 之后执行的代码");
        }
        System.out.println("正常退出run()");
    }
}

class IOBlocked implements Runnable {
    private InputStream inputStream;

    public IOBlocked(InputStream inputStream) {
        this.inputStream = inputStream;
    }

    public void run() {
        try {
            System.out.println("Waiting for input: ");
            inputStream.read();
        } catch (IOException e) {
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("Interrupted from blockedI/O");
            } else {
                throw new RuntimeException(e);
            }
        }
        System.out.println("Exiting IOBlocked.run()");

    }

}

class SynchroizedBlocked implements Runnable {
    public synchronized void f() {
        while (true) {
            Thread.yield();
        }
    }

    public SynchroizedBlocked() {
        new Thread() {
            public void run() {
                f();
            }
        }.start();

    }

    public void run() {
        System.out.println("Trying to call f()");
        f();
        System.out.println("Exiting SynchroizedBlocked.run()");
    }
}
实际上无法打断阻塞在I/O的线程, 因为即使打断了, 还是在等待I/O.

线程协作

之前访问共享变量, 是让线程可以安全的操作同一个内容, 现在线程还可以互相协作指的是等待, 通知, 唤醒等. 可以把每一个线程当成一个微型的程序, 这些程序之间可以互相进行通信, 叫醒和睡眠等. 本质上就是采取一种信号的方式, 让线程之间可以互相响应来做事情. 先要看的是wait()和notifyAll(). wait()函数在调用完毕之后, 调用这个函数的线程是无法自己再执行任务的, 必须要等待其他线程将其激活. 等待的信号就是notifyAll(). sleep()和yield()都不会释放锁, 只会很大概率让调度器执行其他线程. 而 wait() 会释放锁, 这是一个很关键的特性. .wait()可以接受毫秒数作为参数, 表示挂起的时间, 作用和sleep()有些类似(但是会释放锁). sleep()需要打断, 而.wait()是等待notifyAll()消息 wait()如果不加参数, 就是无限等待下去, 直到接收到notify()或者notifyAll()消息. 还要注意的是, wait(), notify(), notifyAll()都是Object的方法之一, 而不是Thread的方法. 而且必须在同步方法内部才能调用这三个方法, 也即调用者三个方法的线程已经进入到了同步方法内部, 也即已经获取了锁.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class WaxOnMatic {

    public static void main(String[] args) throws InterruptedException {
        Car car = new Car();

        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new WaxOn(car));
        executorService.submit(new WaxOff(car));

        TimeUnit.SECONDS.sleep(10);
        //前边学的, 给所有线程发送interrupt信号, 我们靠这个机制结束线程
        executorService.shutdownNow();
    }
}


class Car {

    //有了四个方法之后, 打蜡的任务需要使用两个方法, 即打完腊通知其他线程干活, 以及检查打蜡标记, 如果已经打完腊, 就继续等待
    //抛光的任务则需要使用抛完光再通知其他线程干活, 以及检查抛光标记, 已经抛光过就一直等待

    //打蜡标记同时也是抛光标记, 为false的是表示抛光完成, 为true的时候表示打完腊
    private boolean waxOn = false;

    public synchronized void waxed() {
        //干完打蜡就通知所有线程干活
        waxOn = true;
        notifyAll();
    }

    public synchronized void buffed() {
        //干完抛光就通知其他线程
        waxOn = false;
        notifyAll();
    }

    public synchronized void waitForWaxing() throws InterruptedException {
        //如果没打蜡就继续等待
        while (!waxOn) {
            wait();
        }
    }

    public synchronized void waitForBuffed() throws InterruptedException {
        //如果没抛光就继续等待
        while (waxOn) {
            wait();
        }
    }
}

class WaxOn implements Runnable {
    private Car car;

    public WaxOn(Car car) {
        this.car = car;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                System.out.println("开始打蜡.");
                TimeUnit.MILLISECONDS.sleep(500);
                //打蜡
                car.waxed();
                //等抛光
                car.waitForBuffed();

            }
            //用了try-catch, 打断任务实际上变成了终止线程的工具
        } catch (InterruptedException e) {
            System.out.println("打蜡任务被中断 Interrupted");

        }
        System.out.println("结束打蜡任务");
    }
}

class WaxOff implements Runnable {
    private Car car;

    WaxOff(Car car) {
        this.car = car;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                //注意这里的顺序, 先等待, 再执行, 多个任务的话, 要有一个先执行并且改变标记, 其他的任务等待标记改变之后再来
                car.waitForWaxing();
                System.out.println("开始抛光");
                TimeUnit.MILLISECONDS.sleep(300);
                car.buffed();
            }
        } catch (InterruptedException e) {
            System.out.println("抛光任务被中断 Interrupted");

        }
        System.out.println("结束抛光任务");
    }

}
这个例子有如果设置多个打蜡和抛光的线程, 也是可以工作的, 每次只有拿到锁的线程进行工作, 然后再通知其他线程来工作. 其实很像CSAPP上的生产者和消费者模型. 等待的线程一般都需要使用检查标记和while循环来不断等待, 这也是多线程的套路了, 反正阻塞了不会占用太多资源. 只是要注意并不像底层的信号量使用那么直接, 而是要在共享变量或者说要操作的对象上设置同步方法,然后在其中设置wait()和notify(). 还有要注意的就是, 这里都使用了同一个Car对象, 而Car对象中的 sync 的锁就是car对象本身, notify()和notifyAll()只能唤醒等待同一个锁的对象, 而不是全部的对象. 当然这一点也是不言自明的.

生产者消费者模型

终于到了Java 中的生产者消费者模型, 在CSAPP底层中, 使用了一个数组和三个信号量来判断, 其中一个信号量是用于同步的互斥信号量, 另外两个信号量则用作有多少个数据和空槽可用的标记. 来看看Java 的生产者和消费者模型:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

//餐馆实际上是Meal的容器, 两个线程使用同一个餐馆对象, 其中持有Meal对象, 对于容器访问, 需要加锁才行
public class Restaurant {
    Meal meal = null;
    ExecutorService exec = Executors.newCachedThreadPool();

    //这个很有意思, 直接以当前对象传给Consumer
    final Consumer consumer = new Consumer(this);
    final Chef chef = new Chef(this);

    public Restaurant() {
        exec.execute(chef);
        exec.execute(consumer);

    }

    public static void main(String[] args) {
        new Restaurant();
    }

}


//这个类就是要消费的数据.
class Meal {
    private final int number;

    public Meal(int number) {
        this.number = number;
    }

    public int getNumber() {
        return number;
    }

    @Override
    public String toString() {
        return "Meal{" +
                "number=" + number +
                '}';
    }
}

//消费者, 就是把肉拿走的人
class Consumer implements Runnable {
    private Restaurant restaurant;

    public Consumer(Restaurant restaurant) {
        this.restaurant = restaurant;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    //餐馆里没有肉, 就等待
                    while (restaurant.meal == null) {
                        wait();
                    }
                }
                System.out.println("Consumer got a meal");
                //消费完肉之后, 通知厨子做饭
                synchronized (restaurant.chef) {
                    restaurant.meal = null;
                    restaurant.chef.notifyAll();
                }
            }
        } catch (InterruptedException e) {
            System.out.println("中断消费者");
        }
    }
}

class Chef implements Runnable {
    private Restaurant restaurant;

    private int count = 0;

    public Chef(Restaurant restaurant) {
        this.restaurant = restaurant;
    }

    public void run(){
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (restaurant.meal != null) {
                        wait();
                    }
                }
                if (++count == 10) {
                    System.out.println("生产完了10顿饭.");
                    restaurant.exec.shutdownNow();
                }
                System.out.println("开始做饭");
                synchronized (restaurant.consumer) {
                    restaurant.meal = new Meal(count);
                    restaurant.consumer.notifyAll();
                }
                TimeUnit.SECONDS.sleep(1);
            }

        } catch (InterruptedException e) {
            System.out.println("中断厨子");
        }
    }
}
这个模型是一个线程的消费者和一个线程的生产者, 餐馆是数据对象Meal的持有者. 这里由于是两个单独的线程, 所以在生产和消费数据, 也就是读写meal的时候, 用自己加锁, 而在唤醒另一个生产者/消费者的时候, 用对应的对象加锁. 如果meal是一个容器, 还可以全部都对meal进行加锁, 在其中设置一些变量用来取数据.可以很容易的就把CSAPP的模型移植到Java中来, 只要对象监视器都是同一个就可以了. 然后自己根据练习24的要求, 写了个内部是定长数组的缓冲区队列持有数据, 然后有多个消费者和生产者对象的模型:
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 解决带有缓冲区的单个生产者和消费者的问题
 * 初步想法是弄带有ID标识的数据对象
 * 然后弄一个缓冲区对象Buffer持有数据对象
 * Buffer中的放入和取出数据需要同步, 然后用内部的指针来保存当前的状态
 */

//
public class Ex24 {

    public static void main(String[] args) throws InterruptedException {
        Buffer buffer = Buffer.getBuffer(20);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 1; i++) {
            executorService.execute(new Producer(buffer));

        }
        for (int j = 0; j < 3; j++) {
            executorService.execute(new Consumer(buffer));
        }

        TimeUnit.SECONDS.sleep(3);
        executorService.shutdownNow();
        System.out.println(buffer);
    }
}

//这个作为数据对象
class Data {

//    public static synchronized Data getData() {
//        return new Data(anInt++);
//    }
//
    private int number;
//
//    private static volatile int anInt = 0;

    public Data(int i) {
        this.number = i;
    }

    @Override
    public String toString() {
        return "Data{" +
                "number=" + number +
                '}';
    }
}

//Buffer类作为数据对象的持有者
//有一个取出数据的方法, 一个放入数据的方法, 用于同步
//还需要有可以取出几个和可以放入几个的标志
class Buffer {

    //用工厂模式
    public static Buffer getBuffer(int i) {
        if (i <= 0) {
            throw new RuntimeException("长度必须大于0");
        }
        return new Buffer(i);
    }

    private Data[] datas;

    //用两个索引保存上次放入数据开始的地方, 和放完数据后的位置
    private int lastIndex = 0;
    private int currentIndex = 0;

    //缓冲区长度
    private int length;
    //一开始有几个空槽可以放入数据, 设置为和长度一样
    private int availableToPut;
    //一开始有几个空槽可以取出数据, 设置为0
    private int avaiableToGet = 0;

    //私有构造器, 用工厂模式创建新的Buffer
    private Buffer(int i) {
        this.length = i;
        this.availableToPut = length;
        //初始化一个Data对象的数组, 全部初始化为null
        datas = new Data[i];
        for (int j = 0; j < length; j++) {
            datas[j] = null;
        }
    }

    //放入数据
    public synchronized void putData(Data data) throws InterruptedException {

        //如果可以放入数据的数量为0,就阻塞
        while (availableToPut == 0) {
            wait();
        }
        //如果成功通过阻塞, 说明可以放入数据了, 就放入数据, 同时更新相关指标
        //更新currentIndex到最新的可以放入数据的位置
        datas[currentIndex] = data;
        currentIndex++;
        currentIndex = currentIndex % length;

        //更新可用槽位和可读槽位
        availableToPut--;
        avaiableToGet++;
        //当前线程取出数据后唤醒其他放入数据,也就是生产者线程
        notifyAll();
//        System.out.println("成功放入了数据, 当前状态是: " + this);
    }

    public synchronized Data getData() throws InterruptedException {
        while (avaiableToGet == 0) {
            wait();
        }

        //从上次放入数据的地方取数据, 然后更新lastindex
        Data result = datas[lastIndex];
        lastIndex++;
        lastIndex = lastIndex % length;
        availableToPut++;
        avaiableToGet--;
        //当前线程取出数据后唤醒其他写入数据, 也就是消费者线程
        notifyAll();
//        System.out.println("成功取出了数据, 当前状态是: " + this);
        return result;
    }

    @Override
    public String toString() {
        return "Buffer{" +
                "datas=" + Arrays.toString(datas) +
                ", lastIndex=" + lastIndex +
                ", currentIndex=" + currentIndex +
                ", length=" + length +
                ", availableToPut=" + availableToPut +
                ", avaiableToGet=" + avaiableToGet +
                ", 尚未取出的数据等于=" + (currentIndex >= lastIndex ? currentIndex - lastIndex : currentIndex - lastIndex + length) +
                '}';
    }
}

class Producer implements Runnable {

    private static volatile int number = 0;

    private static Random random = new Random();

    private final Buffer buffer;

    Producer(Buffer buffer) {
        this.buffer = buffer;
    }

    public static synchronized int getNumber() {
        int temp = number;
        number++;
        return temp;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Data newData = new Data(Producer.getNumber());
                TimeUnit.MILLISECONDS.sleep(random.nextInt(30) * random.nextInt(20));
                System.out.println("生产者尝试放入新数据:" + newData);
                buffer.putData(newData);
            }
        } catch (InterruptedException e) {
            System.out.println("生产者被中断了");
        }

    }
}

class Consumer implements Runnable {
    private static Random random = new Random();

    private final Buffer buffer;

    Consumer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        try {
            while (true) {
                TimeUnit.MILLISECONDS.sleep(random.nextInt(30) * random.nextInt(20));
                Data data = buffer.getData();
                System.out.println("消费者取出了数据: " + data);
            }

        } catch (InterruptedException e) {
            System.out.println("消费者被中断了");
        }
    }
}

同步队列

在自己编写的模型中, 实际上同步出现在对缓冲区的读写中, 如果有一个缓冲区已经实现了这种同步方法, 那么所有的线程之间操作缓冲区就可以了, 无需自行编写同步代码. java.util.BlockingQueue提供了这个接口, 还有很多具体实现, 常用的有LinkedBlockingQueue, ArrayBlockingQueue. 这些同步队列都支持泛型, 平常使用的方法主要就是 .take() 返回放入的泛型类型对象, .put() 放入一个泛型类型的对象 这个其实就是缓冲区底层使用不同的数据结构, 外部封装好同步读写功能即可. 本质上和上边自行编写的Buffer类是一样的.

管道

在之前的IO里留下了Piped类型的输入和输出没有用到, 实际上管道就是一个阻塞队列, PipedWriter允许任务向管道写, PipedReader允许不同任务从同一个管道读取. 可以看成是一个生产者-消费者模型的封装好的解决方案. 一个简单的例子如下, sender 每半秒钟发一个字符到管道里, 另外一端的receiver进行输出, 核心是其中的创建管道的语句:
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class PipedIO {

    public static void main(String[] args) throws IOException, InterruptedException {
        Sender sender = new Sender();
        Receiver receiver = new Receiver(sender);

        ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.execute(sender);
        executorService.execute(receiver);

        TimeUnit.SECONDS.sleep(4);
        executorService.shutdownNow();
    }
}


class Sender implements Runnable {

    private Random random = new Random(47);
    //要向管道输出的任务创建一个PipedWriter()对象
    private PipedWriter out = new PipedWriter();

    public PipedWriter getPipedWriter() {
        return out;
    }

    public void run() {
        try {
            while (true) {
                for (char c = 'A'; c <= 'z'; c++) {
                    out.write(c);
                    TimeUnit.MILLISECONDS.sleep(500);
                }
            }
        } catch (IOException | InterruptedException e) {
            System.out.println("IO错误或者被打断");
        }
    }
}

class Receiver implements Runnable {

    private PipedReader in;

    public Receiver(Sender sender) throws IOException {
        //需要从管道中读取数据的任务需要以一个PipedWriter对象作为PipedReader对象的构造器参数
        //获取之后相当于写入任务的out是管道的写入端, 这边是读取端
        //
        in = new PipedReader(sender.getPipedWriter());
    }

    public void run() {
        try {
            while (true) {
                //这里直接调用read()方法即可, 如果读完了, 会阻塞.
                char c = (char) in.read();
                System.out.println(c);
            }
        } catch (IOException e) {
            System.out.println("被打断");
        }
    }
}
如果要多线程读写管道, 注意一个PipedReader只能针对管道只能创建一次, 但是PipedReader对象可以复用, 如果要用多个线程去读管道, 需要修改成这样:
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class PipedIO {

    public static void main(String[] args) throws IOException, InterruptedException {
        Sender sender = new Sender();

        //创建一个共用的PipedReader对象
        PipedReader in = new PipedReader(sender.getPipedWriter());

        //让每个线程都使用这个共用对象即可
        Receiver receiver0 = new Receiver(sender,0, in);
        Receiver receiver1 = new Receiver(sender,1, in);

        ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.execute(sender);
        executorService.execute(receiver0);
        executorService.execute(receiver1);

        TimeUnit.SECONDS.sleep(4);
        executorService.shutdownNow();
    }
}

//Sender代码不变, 省略, 这里要调整一下Receiver的构造器
class Receiver implements Runnable {

    private PipedReader in;

    private int number;

    public Receiver(Sender sender, int i, PipedReader in) throws IOException {
        this.in = in;
        this.number = i;
        //这个是关键, PipedReader以一个PipedWriter对象作为构造器参数, 获取一个Reader对象
//        in = new PipedReader(sender.getPipedWriter());
    }

    public void run() {
        try {
            while (true) {
                //这里直接调用read()方法即可, 如果读完了, 会阻塞.
                char c = (char) in.read();
                System.out.println("ID: " + number + " " + c);
            }
        } catch (IOException e) {
            System.out.println("被打断");
        }
    }

}
管道是没有sleep()和wait()方法可供调用的, 只需要调用.read(), 如果没数据就会自动阻塞.
LICENSED UNDER CC BY-NC-SA 4.0
Comment