IO多路复用通过NIO的Selector相关API来实现, 先看一下基础的操作, 然后编写两个服务器来进行对比. 这里还涉及到Java的网络编程, 在后边也要专门看一下了.
明天就要发布高考分数了, 这里还是大声疾呼一下, 不管多少分, 能报名计算机专业的报名计算机, 报不上和考文科的, 暑假里就报班学Java, 别等了, 真的.
- IO多路复用的基础结构
- 不使用NIO的多线程Echo服务器
- 将多线程Echo服务器使用NIO改造
- 总结
IO多路复用的基础结构
IO多路复用的基础结构, 就是之前提到过的NIO中的Selector相关的API. 包括:
- 支持Selector特性的Channel:
SelectableChannel
Seletor
, 用于监听多路IO的选择器, 其名字来源于操作系统的系统调用select()函数
SelectionKey
, 用于让监听多路IO的选择器知道监听哪一路Channel(文件描述符)的对象, 代表被选择器监听的某一个Channel.
在之前看过操作系统, Java里的操作逻辑实际上和C语言使用系统调用是一样的:
- 创建Selector对象和需要被其监听的一系列Channel对象
- 将Channel对象注册到Selector对象中, 每次注册都可以获取一个独立的SelectionKey, 用于标识某个Channel
- 调用select()方法, 这个方法会阻塞, 直到被监听的Channel中有至少一个可用, 然后返回所有可用的Channel
- 遍历Channel, 对每个Channel进行处理
所以这个逻辑和系统调用select()返回可用的文件描述符的集合如出一辙, 只不过因为是面向对象包了很多东西, 所以在使用的时候还有一些小细节需要注意. 来看看具体的代码吧.
首先是需要知道哪些Channel支持Selector特性, 通过查看文档可以知道, java.nio.channels中有一个类SelectableChannel, 是支持Selector(can be multiplexed via a Selector)的.
然后这个类有一个子类也是一个抽象类AbstractSelectableChannel, 这个抽象类有五个实现类: DatagramChannel, Pipe.SinkChannel, Pipe.SourceChannel, ServerSocketChannel, SocketChannel.
对于网络IO来说, 我们关心其中的DatagramChannel, ServerSocketChannel, SocketChannel, 也就是UDP端, TCP服务端和TCP客户端.
然后是Selector类, java.nio.Channels中的Selector类, 有一个静态方法.open(), 用来创建一个Selector. 就使用这个方法来创建Selector.
有了Channel和Selector之后, 就是将Channel注册到Selector上, 也就是调用channel.ssc.register(selector, SelectionKey.OP_ACCEPT);
.
这里的第二个参数指的是监听什么事件, 对于Socket通信来说唯一可以监听的就是ACCEPT, 也就是连接有数据到达的事件. 这个注册会返回一个SelectionKey对象, 通过这个对象就可以知道是哪个Channel出现事件.
这样三大组件即Channel, Selector, SelectorKey都知道如何创建和使用了, 下边就来看一下细节:
第一步, 准备若干个Channel
//新创建一个ServerSocketChannel, 也就是TCP服务端
ServerSocketChannel serverSocketChannelAt8000 = ServerSocketChannel.open();
//很重要, 将其设置为异步模式, 否则还是同步模式
serverSocketChannelAt8000.configureBlocking(false);
//只有了TCP服务端Channel, 还需要从中获取其内部包装的ServerSocket对象用来绑定端口
ServerSocket socket = serverSocketChannelAt8000.socket();
InetSocketAddress address = new InetSocketAddress(8000);
socket.bind(address);
//继续创建一个绑定7000, 8888端口的channel
ServerSocketChannel serverSocketChannelAt7000 = ServerSocketChannel.open();
serverSocketChannelAt7000.socket().bind(new InetSocketAddress(7000));
serverSocketChannelAt7000.configureBlocking(false);
ServerSocketChannel serverSocketChannelAt8888 = ServerSocketChannel.open();
serverSocketChannelAt8888.socket().bind(new InetSocketAddress(8888));
serverSocketChannelAt8888.configureBlocking(false);
到这里其实可以在后边加一句serverSocketChannel.accept();
, 运行会发现程序直接结束, 这是因为设置了异步, 并没有等待TCP连接进入.
如果改成同步, 则serverSocketChannel.accept();
这句就会一直阻塞到有访问进来.
第二步, 配置Selector和进行注册绑定.
//创建一个selector用于监听
Selector selector = Selector.open();
//channel调用自己的register方法向selector中注册, 并得到一个SelectionKey对象, 当然此时这个Key没有什么用, 实际用的是每个连接进来的描述符对应的Key
//将三个Channel都注册到selector中
SelectionKey key8000 = serverSocketChannelAt8000.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key7000 = serverSocketChannelAt7000.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key8888 = serverSocketChannelAt8888.register(selector, SelectionKey.OP_ACCEPT);
第三步, 启动服务器的主循环, 监听所有Channel, 然后每次对就绪的Channel进行操作:
while (true) {
//Selector进行监听, 返回一个int, 这个指有几个Channel出现了事件, 即IO可用
//注意这个方法是阻塞的, 也就是如果程序执行到了此行之后的语句, 一定会有可用的IO出现
int number = selector.select();
//获取所有可用的keys, 类似于获取所有就绪的Channel
Set<SelectionKey> availableKeys = selector.selectedKeys();
//获取迭代器, 用于所有就绪的Channel
Iterator<SelectionKey> iterable = availableKeys.iterator();
while (iterable.hasNext()) {
SelectionKey selectionKey = iterable.next();
//这里要注意, 处理完一个之后, 需要立刻将其从迭代中去除, 否则下一次还会继续监听到这个端口
iterable.remove();
if (selectionKey.isAcceptable()) {
//从SelectionKey中获取channel对象, 因为知道类型, 所以强制转换, 然后可以从其中获取连接的信息
ServerSocketChannel newChannel = (ServerSocketChannel) selectionKey.channel();
System.out.println("接受连接来自: "+ newChannel.socket().getLocalPort());
//从ServerSocketChannel中获取SocketChannel. 也就是TCP连接
SocketChannel socketChannel = newChannel.accept();
//将内容一次性读入到2048长度的字节中
ByteBuffer byteBufferIn = ByteBuffer.allocate(2048);
System.out.println(socketChannel.read(byteBufferIn));
byteBufferIn.flip();
//这一块也是NIO的用法, 和Charset相关
//将其按照UTF-8进行解码然后放到CharBuffer中, 打印出来
Charset utf8 = Charset.forName("UTF-8");
CharsetDecoder decoder = utf8.newDecoder();
CharBuffer charBufferin = decoder.decode(byteBufferIn);
System.out.println(charBufferin.array());
//直接关闭连接
socketChannel.close();
} else if (selectionKey.isValid() && selectionKey.isReadable()) {
System.out.println("可以读");
} else if (selectionKey.isValid() && selectionKey.isWritable()) {
System.out.println("可以写");
}
}
}
然后就可以启动程序了, 启动之后, 这个selector会同时监听7000,8000,8888端口的三个Channel, 哪个有连接进来, 就会创建连接然后打印出传来的内容.
用浏览器访问http://localhost:8888/
, http://localhost:8000/
, http://localhost:7000/
. 不管哪一个访问进来, selector都可以创建连接.
这样一个线程就可以监听很多IO通道, 而不用每来一个IO都用一个线程去响应. 这里还是使用了单线程进行处理, 实际上, 还可以在每一个SocketServerChannel出现ON_ACCEPT事件的时候, 将每一个Socket连接交给一个线程去处理, 这样就是一个线程监听多个Channel, 然后进行分发, 效率更高. 而且不会等待IO.
不使用NIO的多线程Echo服务器
这一节跟着书上写一个多线程不使用NIO的服务器看看, 看来Java 网络编程得马上补上了. 计算机网络这本书还没看, 好在马上就要把并发干完了.
先是服务器, 这是不使用NIO的传统多线程服务器, 每进来一个新连接, 就将这个连接交给一个新线程去处理, 这也是学C语言的时候写过的老套路了, 直接上代码:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MuiltThreadEchoServer {
//服务器端口与构造器
int port;
public MuiltThreadEchoServer(int port) {
this.port = port;
}
//服务器的线程池
private final ExecutorService pool = Executors.newCachedThreadPool();
//处理每个客户端连接的线程类
static class HandleMsg implements Runnable {
//私有变量与构造器, 保存当前Socket连接
Socket clientSocket;
public HandleMsg(Socket clientSocket) {
this.clientSocket = clientSocket;
}
//核心的run()方法
@Override
public void run() {
BufferedReader reader = null;
PrintWriter writer = null;
//设置好reader和writer
try {
reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
writer = new PrintWriter(clientSocket.getOutputStream(), true);
//尝试读出客户端发来的内容, 然后原样写入
String inputLine = null;
long b = System.currentTimeMillis();
while ((inputLine = reader.readLine()) != null) {
writer.println(inputLine);
}
long e = System.currentTimeMillis();
System.out.println("spend: " + (e - b) + " ms");
} catch (IOException e) {
e.printStackTrace();
}finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (writer != null) {
writer.close();
}
try {
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
//启动服务器
public void start() {
ServerSocket echoServer = null;
Socket clientSocket = null;
//创建服务端Socket, 绑定8000端口, 启动失败直接退出
try{
echoServer = new ServerSocket(port);
} catch (IOException e) {
System.out.println("服务器启动失败");
System.exit(1);
}
while (true) {
try {
//新进来连接就创建一个新线程交给线程池
clientSocket = echoServer.accept();
System.out.println(clientSocket.getRemoteSocketAddress() + " connected.");
pool.execute(new HandleMsg(clientSocket));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
然后是客户端, 这里的客户端就比较搞, 连接成功之后不会释放连接, 而是故意一个一个字的输入, 这样每一个新创建的线程就会卡在等待数据上很久.
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
public class Client implements Runnable {
@Override
public void run() {
Socket client = null;
PrintWriter writer = null;
BufferedReader reader = null;
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost", 8000));
writer = new PrintWriter(client.getOutputStream());
//这里把要写入的信息分6次发送, 发送间隔一秒, 一次发送一个字符
String msg = "Hello!";
for (int i = 0; i < msg.length(); i++) {
Thread.sleep(1000);
writer.print(msg.charAt(i));
}
//要写一个println(), 相当于对面收到EOF, 才能完成一次发送
writer.println();
writer.flush();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
System.out.println("from server: " + reader.readLine());
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (writer != null) {
writer.close();
}
if (client != null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
//这里启动10个线程去连接, 可以发现都连接成功, 但是都耗时6秒钟. 有没有办法让线程不等待呢.
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new Client()).start();
}
}
}
将多线程Echo服务器使用NIO改造
上边的那个例子的核心在于, 如果有大量线程连接之后不做什么事情, 那么服务器没有从客户端连接中获取数据, 无法做使用, 同时又不能杀死连接线程, 就会导致同一时刻有大量的线程被创建但是在等待IO, 会拖慢系统.
现在就用全新的NIO来改造线程服务器, 不发给我数据, 我就不等待, 来了一个数据, 响应一次, 不来, 就当做没事一样.
根据上边的NIO知识, 监听所有可用的I/O对象, 也就是客户端连接, 只需要使用一个线程就行了, 然后每有客户端连接进来数据, 就交给一个线程去处理就行. 如果没有新进来的数据, 那么也没有线程被创建出来进行工作.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NIOEchoServer {
private Selector selector;
private final ExecutorService pool = Executors.newCachedThreadPool();
private final Map<Socket, Long> time = new HashMap<>();
int port;
public NIOEchoServer(int port) {
this.port = port;
}
public void startServer() throws IOException {
//创建Selector
Selector selector = Selector.open();
//创建TCP服务器Channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//绑定到端口
serverSocketChannel.socket().bind(new InetSocketAddress("localhost", port));
//这个套路都知道了, 注册给selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//服务器主循环
while (true) {
//阻塞在此, 监听channel
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
long e = 0;
//这里其实就一个Channel, 只要有连接或者发送信息, 就是出现响应
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
//这个判断是有新连接进来
if (key.isAcceptable()) {
doAccept(key);
}
//可读, 相当于客户端发送信息过来, 记录时间后执行读取动作
else if (key.isReadable() && key.isValid()) {
//向记录时间的Map中写入时间
if (!time.containsKey(((SocketChannel) key.channel()).socket())) {
time.put(((SocketChannel) key.channel()).socket(), System.currentTimeMillis());
}
doRead(key);
//可写, 相当于客户端发完了信息, 在等待响应
} else if (key.isWritable() && key.isValid()) {
doWrite(key);
e = (System.currentTimeMillis());
//从记录时间的map中取出对应的时间, 然后比较一下耗时
long b = time.remove(((SocketChannel) key.channel()).socket());
System.out.println("耗时: " + (e - b) + " ms");
}
}
}
}
private void doWrite(SelectionKey key) {
}
private void doRead(SelectionKey key) {
}
private void doAccept(SelectionKey key) {
}
}
这个逻辑基本上和非NIO的一样, NIO的selector就监听了一个端口的SocketServerChannel, 具体还是要放到方法里去执行从SocketServerChannel获取SocketChannel的方法.
下边来实现一下三个方法, 首先是doAccept():
private void doAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel clientSocket = serverSocketChannel.accept();
//凡是支持selectableChannel的, 都要设置成非阻塞
clientSocket.configureBlocking(false);
//这里很关键, 把获取的具体ClientChannel, 也注册到同一个selector上, 注册事件为可读, 此时已经连接完毕, 下一个事件就是可读了.
//这样同一个selector, 就可以既监听TCP进来连接, 也可以监听已经创建的连接了, 前边的startServer()中的方法的三种分支就有用武之地了.
SelectionKey clientKey = clientSocket.register(selector, SelectionKey.OP_READ);
//由于客户端不是一次性发完全部数据, 而是慢慢发送, 因此给key attach一个对象, 就像是MVC里的model一样, 以后再使用这个key, 都可以共享这个数据对象
ClientData data = new ClientData();
clientKey.attach(data);
//doAccept()的作用就是获取TCP连接, 将连接加入到Selector的监听中, 然后附加上一个数据对象用于和客户端的通信, 之后就结束了.
}
然后是doRead()方法, 这个是指一个SocketChannel可读之后, 会被Selector监听到, 然后在主循环中交给doRead()方法来使用:
private void doRead(SelectionKey key) throws IOException {
//注意, 进到这里的SelectionKey不是ServerSocketChannel的Key, 而是SocketChannel的Key, 这是因为能够监听到READ事件的是在Accept()方法里注册SocketChannel的Key
SocketChannel clientSocket = (SocketChannel) key.channel();
//创建8K大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(8192);
int len;
//尝试读取
try {
len = clientSocket.read(byteBuffer);
//如果读取结果是-1, 说明读取完毕, 直接结束
if (len < 0) {
disconnect(key);
return;
}
} catch (IOException e) {
//出错也关闭channel
System.out.println("Failed to read from client.");
e.printStackTrace();
disconnect(key);
}
byteBuffer.flip();
//这里使用了一个处理消息的类, 交给线程池进行处理
pool.execute(new HandleMsg(key, byteBuffer));
}
来马上看一下数据对象ClientData和对应的HandleMsg类:
import java.nio.ByteBuffer;
import java.util.LinkedList;
public class ClientData {
private LinkedList<ByteBuffer> outQueue;
public ClientData() {
outQueue = new LinkedList<>();
}
public LinkedList<ByteBuffer> getOutQueue() {
return outQueue;
}
public void enqueue(ByteBuffer buffer) {
outQueue.addFirst(buffer);
}
}
private class HandleMsg implements Runnable {
ByteBuffer byteBuffer;
SelectionKey key;
public HandleMsg(SelectionKey key, ByteBuffer byteBuffer) {
this.key = key;
this.byteBuffer = byteBuffer;
}
@Override
public void run() {
ClientData data = (ClientData) key.attachment();
data.enqueue(byteBuffer);
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
selector.wakeup();
}
}
数据对象内部是一个队列, 保存每次从客户端读到的8K长度的ByteBuffer, HandleMsg则是获取数据对象然后把读到的消息放进去. 因为读过了消息, 所以将那个key的感兴趣事件从只是READ变成READ+WRITE.
然后强制去唤醒selector, 如果selector阻塞, 这个时候会重新再开始一轮监听, 此时就可以监听key新增的WRITE事件了.
最后是doWrite()函数, 与doRead()一样, 进来的key是SocketChannel的key, 而不是ServerSocketChannel的key:
private void doWrite(SelectionKey key) throws IOException {
//注意, 进到这里的SelectionKey不是ServerSocketChannel的Key, 而是SocketChannel的Key, 这是因为能够监听到READ事件的是在Accept()方法里注册SocketChannel的Key
SocketChannel clientSocket = (SocketChannel) key.channel();
ClientData data = (ClientData) key.attachment();
LinkedList<ByteBuffer> buffers = data.getOutQueue();
ByteBuffer lastBuffer = buffers.getLast();
try {
int len = clientSocket.write(lastBuffer);
if (len == -1) {
disconnect(key);
return;
}
//完整的写了一个Buffer, 就移除
if (lastBuffer.remaining() == 0) {
buffers.removeLast();
}
} catch (IOException e) {
System.out.println("向客户端写入失败.");
e.printStackTrace();
disconnect(key);
}
//很重要, 如果全部写完, 就要取消OP_WRITE事件监听
if (buffers.size() == 0) {
key.interestOps(SelectionKey.OP_READ);
}
}
write函数的逻辑比较简单, 每次写一个ByteBuffer, 特别要注意的就是每次写完之后, 一定要取消对WRITE事件的监听.
然后是辅助的disconnect()方法, 关闭那个Key对应的Channel, 这样就再也不会出来事件被监听到了.
private void disconnect(SelectionKey key) throws IOException {
SelectableChannel channel = key.channel();
channel.close();
}
对于最后的disconnect函数我还有一点暂时没搞明白, 就是如何从selector中去掉要监听的Key, 这样才可以算完成的取消连接.
写完了整个服务器, 发现确实响应很快, 只会在有IO的时候进行操作, 因此每个连接的实际服务时间也只有15ms左右. 与原来的6000ms有天壤之别. 即使客户端非常慢, 服务器也不会花费资源在等待IO上, 这就是NIO的改进.
总结
这个NIO服务器, 一个Selector先监听服务端Socket, 然后出现连接的时候, 将连接也注册同一个Selector内进行监听.
每次监听的时候, 由于服务端Socket和Socket连接的特性不同, 所以可以分为可连接, 可读和可写状态. 可连接就将新连接加入监听. 可读就读一次然后通过attach来在key中共享数据. 可写就进行写入.
其中读通过多线程进行,每次读入到一个ByteBuffer中, 加入到共享的数据对象中. 每次只要有数据传送, 就会读一次, 然后就结束, 不会一直卡在连接上.
写每次写一个ByteBuffer, 但是由于可写的时候不阻塞, 因此会反复写完之前已经收到的数据.
这样服务器对于IO的实际处理和等待时间就大幅下降了. 这个服务器给我的启示还是不少的, 其中用一个Selector监听所有Channel确实是个好办法, 也体验到了多态, 无论SocketServerChannel还是SocketChannel, 都是SelectableChannel, 所以可以被Selector监听.