并发 – Java并发 NIO – 异步API:IO多路复用

并发 – Java并发 NIO – 异步API:IO多路复用

经常看到多路复用

IO多路复用通过NIO的Selector相关API来实现, 先看一下基础的操作, 然后编写两个服务器来进行对比. 这里还涉及到Java的网络编程, 在后边也要专门看一下了. 明天就要发布高考分数了, 这里还是大声疾呼一下, 不管多少分, 能报名计算机专业的报名计算机, 报不上和考文科的, 暑假里就报班学Java, 别等了, 真的.
  1. IO多路复用的基础结构
  2. 不使用NIO的多线程Echo服务器
  3. 将多线程Echo服务器使用NIO改造
  4. 总结

IO多路复用的基础结构

IO多路复用的基础结构, 就是之前提到过的NIO中的Selector相关的API. 包括:
  1. 支持Selector特性的Channel:SelectableChannel
  2. Seletor, 用于监听多路IO的选择器, 其名字来源于操作系统的系统调用select()函数
  3. SelectionKey, 用于让监听多路IO的选择器知道监听哪一路Channel(文件描述符)的对象, 代表被选择器监听的某一个Channel.
在之前看过操作系统, Java里的操作逻辑实际上和C语言使用系统调用是一样的:
  1. 创建Selector对象和需要被其监听的一系列Channel对象
  2. 将Channel对象注册到Selector对象中, 每次注册都可以获取一个独立的SelectionKey, 用于标识某个Channel
  3. 调用select()方法, 这个方法会阻塞, 直到被监听的Channel中有至少一个可用, 然后返回所有可用的Channel
  4. 遍历Channel, 对每个Channel进行处理
所以这个逻辑和系统调用select()返回可用的文件描述符的集合如出一辙, 只不过因为是面向对象包了很多东西, 所以在使用的时候还有一些小细节需要注意. 来看看具体的代码吧. 首先是需要知道哪些Channel支持Selector特性, 通过查看文档可以知道, java.nio.channels中有一个类SelectableChannel, 是支持Selector(can be multiplexed via a Selector)的. 然后这个类有一个子类也是一个抽象类AbstractSelectableChannel, 这个抽象类有五个实现类: DatagramChannel, Pipe.SinkChannel, Pipe.SourceChannel, ServerSocketChannel, SocketChannel. 对于网络IO来说, 我们关心其中的DatagramChannel, ServerSocketChannel, SocketChannel, 也就是UDP端, TCP服务端和TCP客户端. 然后是Selector类, java.nio.Channels中的Selector类, 有一个静态方法.open(), 用来创建一个Selector. 就使用这个方法来创建Selector. 有了Channel和Selector之后, 就是将Channel注册到Selector上, 也就是调用channel.ssc.register(selector, SelectionKey.OP_ACCEPT);. 这里的第二个参数指的是监听什么事件, 对于Socket通信来说唯一可以监听的就是ACCEPT, 也就是连接有数据到达的事件. 这个注册会返回一个SelectionKey对象, 通过这个对象就可以知道是哪个Channel出现事件. 这样三大组件即Channel, Selector, SelectorKey都知道如何创建和使用了, 下边就来看一下细节: 第一步, 准备若干个Channel
//新创建一个ServerSocketChannel, 也就是TCP服务端
ServerSocketChannel serverSocketChannelAt8000 = ServerSocketChannel.open();

//很重要, 将其设置为异步模式, 否则还是同步模式
serverSocketChannelAt8000.configureBlocking(false);

//只有了TCP服务端Channel, 还需要从中获取其内部包装的ServerSocket对象用来绑定端口
ServerSocket socket = serverSocketChannelAt8000.socket();
InetSocketAddress address = new InetSocketAddress(8000);
socket.bind(address);

//继续创建一个绑定7000, 8888端口的channel
ServerSocketChannel serverSocketChannelAt7000 = ServerSocketChannel.open();
serverSocketChannelAt7000.socket().bind(new InetSocketAddress(7000));
serverSocketChannelAt7000.configureBlocking(false);

ServerSocketChannel serverSocketChannelAt8888 = ServerSocketChannel.open();
serverSocketChannelAt8888.socket().bind(new InetSocketAddress(8888));

serverSocketChannelAt8888.configureBlocking(false);
到这里其实可以在后边加一句serverSocketChannel.accept();, 运行会发现程序直接结束, 这是因为设置了异步, 并没有等待TCP连接进入. 如果改成同步, 则serverSocketChannel.accept();这句就会一直阻塞到有访问进来. 第二步, 配置Selector和进行注册绑定.
//创建一个selector用于监听
Selector selector = Selector.open();

//channel调用自己的register方法向selector中注册, 并得到一个SelectionKey对象, 当然此时这个Key没有什么用, 实际用的是每个连接进来的描述符对应的Key
//将三个Channel都注册到selector中
SelectionKey key8000 = serverSocketChannelAt8000.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key7000 = serverSocketChannelAt7000.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key8888 = serverSocketChannelAt8888.register(selector, SelectionKey.OP_ACCEPT);
第三步, 启动服务器的主循环, 监听所有Channel, 然后每次对就绪的Channel进行操作:
while (true) {

    //Selector进行监听, 返回一个int, 这个指有几个Channel出现了事件, 即IO可用
    //注意这个方法是阻塞的, 也就是如果程序执行到了此行之后的语句, 一定会有可用的IO出现
    int number = selector.select();

    //获取所有可用的keys, 类似于获取所有就绪的Channel
    Set<SelectionKey> availableKeys = selector.selectedKeys();

    //获取迭代器, 用于所有就绪的Channel
    Iterator<SelectionKey> iterable = availableKeys.iterator();

    while (iterable.hasNext()) {
        SelectionKey selectionKey = iterable.next();
        //这里要注意, 处理完一个之后, 需要立刻将其从迭代中去除, 否则下一次还会继续监听到这个端口
        iterable.remove();
        if (selectionKey.isAcceptable()) {
            //从SelectionKey中获取channel对象, 因为知道类型, 所以强制转换, 然后可以从其中获取连接的信息
            ServerSocketChannel newChannel = (ServerSocketChannel) selectionKey.channel();
            System.out.println("接受连接来自: "+ newChannel.socket().getLocalPort());
            //从ServerSocketChannel中获取SocketChannel. 也就是TCP连接
            SocketChannel socketChannel = newChannel.accept();


            //将内容一次性读入到2048长度的字节中
            ByteBuffer byteBufferIn = ByteBuffer.allocate(2048);
            System.out.println(socketChannel.read(byteBufferIn));
            byteBufferIn.flip();

            //这一块也是NIO的用法, 和Charset相关
            //将其按照UTF-8进行解码然后放到CharBuffer中, 打印出来
            Charset utf8 = Charset.forName("UTF-8");
            CharsetDecoder decoder = utf8.newDecoder();

            CharBuffer charBufferin = decoder.decode(byteBufferIn);

            System.out.println(charBufferin.array());

            //直接关闭连接
            socketChannel.close();

        } else if (selectionKey.isValid() && selectionKey.isReadable()) {
            System.out.println("可以读");
        } else if (selectionKey.isValid() && selectionKey.isWritable()) {
            System.out.println("可以写");
        }
    }
}
然后就可以启动程序了, 启动之后, 这个selector会同时监听7000,8000,8888端口的三个Channel, 哪个有连接进来, 就会创建连接然后打印出传来的内容. 用浏览器访问http://localhost:8888/, http://localhost:8000/, http://localhost:7000/. 不管哪一个访问进来, selector都可以创建连接. 这样一个线程就可以监听很多IO通道, 而不用每来一个IO都用一个线程去响应. 这里还是使用了单线程进行处理, 实际上, 还可以在每一个SocketServerChannel出现ON_ACCEPT事件的时候, 将每一个Socket连接交给一个线程去处理, 这样就是一个线程监听多个Channel, 然后进行分发, 效率更高. 而且不会等待IO.

不使用NIO的多线程Echo服务器

这一节跟着书上写一个多线程不使用NIO的服务器看看, 看来Java 网络编程得马上补上了. 计算机网络这本书还没看, 好在马上就要把并发干完了. 先是服务器, 这是不使用NIO的传统多线程服务器, 每进来一个新连接, 就将这个连接交给一个新线程去处理, 这也是学C语言的时候写过的老套路了, 直接上代码:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MuiltThreadEchoServer {

    //服务器端口与构造器
    int port;

    public MuiltThreadEchoServer(int port) {
        this.port = port;
    }

    //服务器的线程池
    private final ExecutorService pool = Executors.newCachedThreadPool();

    //处理每个客户端连接的线程类
    static class HandleMsg implements Runnable {
        //私有变量与构造器, 保存当前Socket连接
        Socket clientSocket;

        public HandleMsg(Socket clientSocket) {
            this.clientSocket = clientSocket;
        }

        //核心的run()方法
        @Override
        public void run() {
            BufferedReader reader = null;
            PrintWriter writer = null;

            //设置好reader和writer
            try {
                reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                writer = new PrintWriter(clientSocket.getOutputStream(), true);

                //尝试读出客户端发来的内容, 然后原样写入
                String inputLine = null;
                long b = System.currentTimeMillis();
                while ((inputLine = reader.readLine()) != null) {
                    writer.println(inputLine);
                }
                long e = System.currentTimeMillis();
                System.out.println("spend: " + (e - b) + " ms");

            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (writer != null) {
                    writer.close();
                }
                try {
                    clientSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }
    }

    //启动服务器
    public void start() {
        ServerSocket echoServer = null;
        Socket clientSocket = null;
        //创建服务端Socket, 绑定8000端口, 启动失败直接退出
        try{
            echoServer = new ServerSocket(port);
        } catch (IOException e) {
            System.out.println("服务器启动失败");
            System.exit(1);
        }

        while (true) {
            try {
                //新进来连接就创建一个新线程交给线程池
                clientSocket = echoServer.accept();
                System.out.println(clientSocket.getRemoteSocketAddress() + " connected.");
                pool.execute(new HandleMsg(clientSocket));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

}
然后是客户端, 这里的客户端就比较搞, 连接成功之后不会释放连接, 而是故意一个一个字的输入, 这样每一个新创建的线程就会卡在等待数据上很久.
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;

public class Client implements Runnable {

    @Override
    public void run() {
        Socket client = null;
        PrintWriter writer = null;
        BufferedReader reader = null;
        try {
            client = new Socket();
            client.connect(new InetSocketAddress("localhost", 8000));
            writer = new PrintWriter(client.getOutputStream());

            //这里把要写入的信息分6次发送, 发送间隔一秒, 一次发送一个字符
            String msg = "Hello!";
            for (int i = 0; i < msg.length(); i++) {
                Thread.sleep(1000);
                writer.print(msg.charAt(i));
            }
            //要写一个println(), 相当于对面收到EOF, 才能完成一次发送
            writer.println();
            writer.flush();

            reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
            System.out.println("from server: " + reader.readLine());

        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (writer != null) {
                writer.close();
            }
            if (client != null) {
                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //这里启动10个线程去连接, 可以发现都连接成功, 但是都耗时6秒钟. 有没有办法让线程不等待呢.
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(new Client()).start();

        }
    }
}

将多线程Echo服务器使用NIO改造

上边的那个例子的核心在于, 如果有大量线程连接之后不做什么事情, 那么服务器没有从客户端连接中获取数据, 无法做使用, 同时又不能杀死连接线程, 就会导致同一时刻有大量的线程被创建但是在等待IO, 会拖慢系统. 现在就用全新的NIO来改造线程服务器, 不发给我数据, 我就不等待, 来了一个数据, 响应一次, 不来, 就当做没事一样. 根据上边的NIO知识, 监听所有可用的I/O对象, 也就是客户端连接, 只需要使用一个线程就行了, 然后每有客户端连接进来数据, 就交给一个线程去处理就行. 如果没有新进来的数据, 那么也没有线程被创建出来进行工作.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NIOEchoServer {

    private Selector selector;
    private final ExecutorService pool = Executors.newCachedThreadPool();
    private final Map<Socket, Long> time = new HashMap<>();

    int port;

    public NIOEchoServer(int port) {
        this.port = port;
    }

    public void startServer() throws IOException {
        //创建Selector
        Selector selector = Selector.open();
        //创建TCP服务器Channel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        //绑定到端口
        serverSocketChannel.socket().bind(new InetSocketAddress("localhost", port));
        //这个套路都知道了, 注册给selector
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        //服务器主循环
        while (true) {
            //阻塞在此, 监听channel
            selector.select();

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            long e = 0;
            
            //这里其实就一个Channel, 只要有连接或者发送信息, 就是出现响应
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                
                //这个判断是有新连接进来
                if (key.isAcceptable()) {
                    doAccept(key);
                }
                
                //可读, 相当于客户端发送信息过来, 记录时间后执行读取动作
                else if (key.isReadable() && key.isValid()) {
                    //向记录时间的Map中写入时间
                    if (!time.containsKey(((SocketChannel) key.channel()).socket())) {
                        time.put(((SocketChannel) key.channel()).socket(), System.currentTimeMillis());
                    }
                    doRead(key);
                    
                //可写, 相当于客户端发完了信息, 在等待响应
                } else if (key.isWritable() && key.isValid()) {
                    doWrite(key);
                    e = (System.currentTimeMillis());
                    //从记录时间的map中取出对应的时间, 然后比较一下耗时
                    long b = time.remove(((SocketChannel) key.channel()).socket());
                    System.out.println("耗时: " + (e - b) + " ms");
                    
                }
            }
        }
    }

    private void doWrite(SelectionKey key) {
    }

    private void doRead(SelectionKey key) {
    }

    private void doAccept(SelectionKey key) {
    }
}
这个逻辑基本上和非NIO的一样, NIO的selector就监听了一个端口的SocketServerChannel, 具体还是要放到方法里去执行从SocketServerChannel获取SocketChannel的方法. 下边来实现一下三个方法, 首先是doAccept():
private void doAccept(SelectionKey key) throws IOException {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
    SocketChannel clientSocket = serverSocketChannel.accept();

    //凡是支持selectableChannel的, 都要设置成非阻塞
    clientSocket.configureBlocking(false);

    //这里很关键, 把获取的具体ClientChannel, 也注册到同一个selector上, 注册事件为可读, 此时已经连接完毕, 下一个事件就是可读了.
    //这样同一个selector, 就可以既监听TCP进来连接, 也可以监听已经创建的连接了, 前边的startServer()中的方法的三种分支就有用武之地了.
    SelectionKey clientKey = clientSocket.register(selector, SelectionKey.OP_READ);

    //由于客户端不是一次性发完全部数据, 而是慢慢发送, 因此给key attach一个对象, 就像是MVC里的model一样, 以后再使用这个key, 都可以共享这个数据对象
    ClientData data = new ClientData();
    clientKey.attach(data);

    //doAccept()的作用就是获取TCP连接, 将连接加入到Selector的监听中, 然后附加上一个数据对象用于和客户端的通信, 之后就结束了.
}
然后是doRead()方法, 这个是指一个SocketChannel可读之后, 会被Selector监听到, 然后在主循环中交给doRead()方法来使用:
private void doRead(SelectionKey key) throws IOException {
    //注意,  进到这里的SelectionKey不是ServerSocketChannel的Key, 而是SocketChannel的Key, 这是因为能够监听到READ事件的是在Accept()方法里注册SocketChannel的Key
    SocketChannel clientSocket = (SocketChannel) key.channel();
    //创建8K大小的缓冲区
    ByteBuffer byteBuffer = ByteBuffer.allocate(8192);

    int len;

    //尝试读取
    try {
        len = clientSocket.read(byteBuffer);
        //如果读取结果是-1, 说明读取完毕, 直接结束
        if (len < 0) {
            disconnect(key);
            return;
        }
    } catch (IOException e) {
        //出错也关闭channel
        System.out.println("Failed to read from client.");
        e.printStackTrace();
        disconnect(key);
    }

    byteBuffer.flip();

    //这里使用了一个处理消息的类, 交给线程池进行处理
    pool.execute(new HandleMsg(key, byteBuffer));

}
来马上看一下数据对象ClientData和对应的HandleMsg类:
import java.nio.ByteBuffer;
import java.util.LinkedList;

public class ClientData {

    private LinkedList<ByteBuffer> outQueue;

    public ClientData() {
        outQueue = new LinkedList<>();
    }

    public LinkedList<ByteBuffer> getOutQueue() {
        return outQueue;
    }

    public void enqueue(ByteBuffer buffer) {
        outQueue.addFirst(buffer);
    }
}
private class HandleMsg implements Runnable {
    ByteBuffer byteBuffer;
    SelectionKey key;
    public HandleMsg(SelectionKey key, ByteBuffer byteBuffer) {
        this.key = key;
        this.byteBuffer = byteBuffer;
    }

    @Override
    public void run() {
        ClientData data = (ClientData) key.attachment();
        data.enqueue(byteBuffer);
        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        selector.wakeup();
    }
}
数据对象内部是一个队列, 保存每次从客户端读到的8K长度的ByteBuffer, HandleMsg则是获取数据对象然后把读到的消息放进去. 因为读过了消息, 所以将那个key的感兴趣事件从只是READ变成READ+WRITE. 然后强制去唤醒selector, 如果selector阻塞, 这个时候会重新再开始一轮监听, 此时就可以监听key新增的WRITE事件了. 最后是doWrite()函数, 与doRead()一样, 进来的key是SocketChannel的key, 而不是ServerSocketChannel的key:
private void doWrite(SelectionKey key) throws IOException {
    //注意,  进到这里的SelectionKey不是ServerSocketChannel的Key, 而是SocketChannel的Key, 这是因为能够监听到READ事件的是在Accept()方法里注册SocketChannel的Key
    SocketChannel clientSocket = (SocketChannel) key.channel();
    ClientData data = (ClientData) key.attachment();

    LinkedList<ByteBuffer> buffers = data.getOutQueue();

    ByteBuffer lastBuffer = buffers.getLast();

    try {
        int len = clientSocket.write(lastBuffer);
        if (len == -1) {
            disconnect(key);
            return;
        }

        //完整的写了一个Buffer, 就移除
        if (lastBuffer.remaining() == 0) {
            buffers.removeLast();
        }
    } catch (IOException e) {
        System.out.println("向客户端写入失败.");
        e.printStackTrace();
        disconnect(key);
    }

    //很重要, 如果全部写完, 就要取消OP_WRITE事件监听
    if (buffers.size() == 0) {
        key.interestOps(SelectionKey.OP_READ);
    }

}
write函数的逻辑比较简单, 每次写一个ByteBuffer, 特别要注意的就是每次写完之后, 一定要取消对WRITE事件的监听. 然后是辅助的disconnect()方法, 关闭那个Key对应的Channel, 这样就再也不会出来事件被监听到了.
private void disconnect(SelectionKey key) throws IOException {
    SelectableChannel channel = key.channel();
    channel.close();
}
对于最后的disconnect函数我还有一点暂时没搞明白, 就是如何从selector中去掉要监听的Key, 这样才可以算完成的取消连接. 写完了整个服务器, 发现确实响应很快, 只会在有IO的时候进行操作, 因此每个连接的实际服务时间也只有15ms左右. 与原来的6000ms有天壤之别. 即使客户端非常慢, 服务器也不会花费资源在等待IO上, 这就是NIO的改进.

总结

这个NIO服务器, 一个Selector先监听服务端Socket, 然后出现连接的时候, 将连接也注册同一个Selector内进行监听. 每次监听的时候, 由于服务端Socket和Socket连接的特性不同, 所以可以分为可连接, 可读和可写状态. 可连接就将新连接加入监听. 可读就读一次然后通过attach来在key中共享数据. 可写就进行写入. 其中读通过多线程进行,每次读入到一个ByteBuffer中, 加入到共享的数据对象中. 每次只要有数据传送, 就会读一次, 然后就结束, 不会一直卡在连接上. 写每次写一个ByteBuffer, 但是由于可写的时候不阻塞, 因此会反复写完之前已经收到的数据. 这样服务器对于IO的实际处理和等待时间就大幅下降了. 这个服务器给我的启示还是不少的, 其中用一个Selector监听所有Channel确实是个好办法, 也体验到了多态, 无论SocketServerChannel还是SocketChannel, 都是SelectableChannel, 所以可以被Selector监听.
LICENSED UNDER CC BY-NC-SA 4.0
Comment