Java的并发部分快要看完了, 看完NIO之后就剩下Java 8 函数式编程中的并发内容了. 当然, Java 9中的java.util.concurrent还加了一个反应式编程的Flow. 加上Spring的反应式编程还一直没有仔细的看过, 还是得有机会看看.
在上一个NIO服务器中, 已经实现了不会阻塞等待TCP连接传数据, 不过在doRead()和doWrite()中, 实际的I/O操作还是阻塞的, 比如返回一个文件给客户端, 那么在没有完成文件的读操作之前, 线程依然会阻塞在等待文件I/O操作处.
NIO的最后一部分API, 也就是Channel类中Asynchronous开头的类, 就是为了彻底实现异步IO而准备的. 这些API不仅在Key(文件描述符)没有准备好的时候不阻塞, 就连正常的IO操作, 也不阻塞, 而是等待操作系统完成IO的时候注册一个回调函数来执行. 这样就实现了彻底的异步.
- AIO服务器
- 启动AIO服务器
- AIO客户端
AIO服务器
这四个异步API再回看一下, 第一个是表示一组AIO, 第二个是异步文件, 第三个和第四个就是TCP的AIO API.
AsynchronousChannelGroup
AsynchronousFileChannel
AsynchronousServerSocketChannel
AsynchronousSocketChannel
创建AIO服务器的代码如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class AIOServer {
private final int port;
private AsynchronousServerSocketChannel serverSocketChannel;
public AIOServer(int port) {
this.port = port;
try {
//启动服务器和绑定端口
this.serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port));
} catch (IOException e) {
System.out.println("服务器启动失败");
System.exit(1);
}
}
public void start() {
System.out.println("服务器启动在: " + port + "端口");
//非常关键的一步, 注册一个CompletionHandler, 这个是java.nio.channels中的一个接口, 专门用于AIO的回调
//第一个参数是一个attachment, 这个attachment会在后边的两个方法内作为第二个参数被使用
serverSocketChannel.accept(42, new CompletionHandler<AsynchronousSocketChannel, Object>() {
//这个是必须实现的方法之一, 用于成功的时候如何做
final ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
//由于纯异步, 需要搭配Future对象, 先定义一个要返回给客户端的结果, 然后放在Future对象中
Future<Integer> writeResult = null;
byteBuffer.clear();
//重要, AsynchronousSocketChannel的read方法返回一个Future<Integer>对象, 表示后来读到了几个字节, 因为这个read方法不阻塞.
//非Asynchronous的Channel比如上一次的SocketChannel的read方法返回一个int, 这是因为read是阻塞的.
writeResult = result.read(byteBuffer);
//睡眠1秒来模拟做其他事情
try {
System.out.println("服务端开始做1秒钟的其他事情...");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//只要不调用Future对象的.get()方法, 就不会等待Future对象完成, 所以到这里都不会阻塞
try {
System.out.println("实际读取到了: "+ writeResult.get(100, TimeUnit.SECONDS) + "个字节.");
System.out.println("读取到的是: " + Arrays.toString(byteBuffer.array()));
byteBuffer.flip();
//这个write()方法也一样返回一个Future对象, 然后立刻返回
writeResult = result.write(byteBuffer);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
System.out.println("Future结果读取错误.");
} finally {
serverSocketChannel.accept(null, this);
//等待写完
try {
//确认写完, 然后关闭当前连接的AsyncSocketChannel
writeResult.get();
result.close();
} catch (InterruptedException | ExecutionException | IOException e) {
e.printStackTrace();
}
}
}
//这个是另外一个必须实现的方法, 用于失败的时候如何做
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接失败. 异常是: " + exc);
}
});
}
}
这个服务器唯一的阻塞点, 就是连接创建成功之后, 对读取的Future对象调用.get()方法, 以及在写入的时候, 保证写完, 调用.get()方法.
其他的所有方法, 完全都不阻塞, 确实给力.
启动AIO服务器
启动AIO服务器就不像原来的服务器中有select()阻塞, 启动了就可以, AIO服务器在没有连接进来的时候, 所有方法都是不阻塞的, 相当于用一个主线程启动了在另外一个线程运行的AIO服务器. 如果没有连接进来, 主线程运行结束之后, 整个程序也结束了, 因为不阻塞, 全部运行到底了. 又没有while(true)循环, 所以主线程可以做自己的事情, 然后睡眠等待.
public static void main(String[] args) throws IOException, InterruptedException {
//纯异步, 直接结束
new AIOServer(8000).start();
//这里让主线程等待, 后台服务器已经在另外一个线程中开启. 由于所有方法都不阻塞, 主线程退出的时候服务器也关闭了.
while (true) {
Thread.sleep(1000);
}
}
AIO客户端
看了AIO服务端, 再来看看客户端的编写. 客户端的编写与服务端非常类似, 只不过是直接使用AsyncSocketChannel来进行连接, 也一样需要注册回调对象.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
public class AIOClient {
private final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
public AIOClient() throws IOException {
}
public void connect(int port) {
//关键的connect函数, 第二个参数是attachment, 最后是一个回调对象
client.connect(new InetSocketAddress("localhost", port), null, new CompletionHandler<Void, Object>() {
//连接成功后执行这个方法
@Override
public void completed(Void result, Object attachment) {
System.out.println("连接成功");
//内部对于写入, 有一个参数的返回Future对象的方法, 也有三参数就像这里的, 继续回调的方法, 因为连接成功后需要发送数据, 所以继续回调
client.write(ByteBuffer.wrap("Hello!".getBytes(StandardCharsets.UTF_8)), null, new CompletionHandler<Integer, Object>() {
//到这里是写入成功, 所以继续回调, 要进行读取
@Override
public void completed(Integer result, Object attachment) {
System.out.println("写入成功");
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.clear();
client.read(byteBuffer, null, new CompletionHandler<Integer, Object>() {
//运行到这里说明读取成功
@Override
public void completed(Integer result, Object attachment) {
System.out.println("读取成功");
byteBuffer.flip();
System.out.println("from server: " + Arrays.toString(byteBuffer.array()));
//读取成功后关闭客户端连接
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//读取失败
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("从服务器读取失败");
}
});
}
//这里是写入失败
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("向服务器写入失败");
}
});
}
//这里是连接失败
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接失败");
}
});
}
}
由于是纯异步, 所以这里其实是一个回调地狱. Java写着写着写出JavaScript的感觉来了.AsynchronousSocketChannel
的connect, read和write方法, 全部都可以继续注册回调对象, 表明成功之后的下一步动作.
因此这个客户端的逻辑是: 先进行连接, 连接成功后有一个回调去尝试发送数据, 在发送数据的write()中再注册回调, 如果发送成功, 就尝试读取. 在读取函数中再注册回调, 如果读取成功就打印出来.
虽然是异步, 但是整个过程中, 主线程都不阻塞, 可以做其他事情. 就像这样调用:
public static void main(String[] args) throws IOException, InterruptedException {
AIOClient client = new AIOClient();
client.connect(8000);
while (true) {
System.out.println("做其他事情");
Thread.sleep(1000);
}
}
这种纯粹的不阻塞的IO方式看完了, 其实感觉主要还是依赖Future对象, 用来实现异步的调用, 而Future其实本质就是一个线程池加上Callable对象的封装. 与底层操作系统提供的异步IO也有着直接关系.
接下来就是函数式编程并发的部分了.