从阻塞,到非阻塞,再到异步的网络通讯介绍

本文主要用JDK中的NIO包中类和方法,完成一个“客户端-服务端的网络通讯demo代码”的阻塞->非阻塞->异步演进过程。

本文涉及的代码可在这里获取

V1.0 阻塞版

服务端

主体程序

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ServerV1 {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        serverSocketChannel.socket().bind(new InetSocketAddress(8088));
        System.out.println("开始监听8088端口...");
        while(true){
            SocketChannel socketChannel = serverSocketChannel.accept();
            ServerSocketHandler handler = new ServerSocketHandler(socketChannel);
            new Thread(handler).start();
        }
    }
}

消息处理方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ServerSocketHandler implements Runnable {
    private SocketChannel socketChannel;
    public ServerSocketHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }
    @Override
    public void run() {
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try {
            while (socketChannel.read(buf) > 0) {
                buf.flip();

                byte[] bytes = buf.array();

                String sr = new String(bytes, "UTF-8");
                System.out.println("服务端——收到请求:" + sr);
                ByteBuffer writeBuf = ByteBuffer.wrap(("服务端已收到请求:" + sr).getBytes());
                socketChannel.write(writeBuf);
                buf.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

说明:服务端每收到一个连接请求,就起一个线程去连接并处理消息。

缺点:

  • 每个连接一个线程,不能支持高并发,此外,线程切换的开销也比较大;
  • 当接收连接请求后就新建线程去处理,但此时可能没有数据,读取数据的操作(SocketChannel#read)就阻塞了。

客户端

一个简单地客户端,发送消息并接收服务端返回消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ClientV1 {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8088));
        ByteBuffer buf = ByteBuffer.wrap("first Message".getBytes());

        socketChannel.write(buf);
        ByteBuffer readBuf = ByteBuffer.allocate(1024);
        if (socketChannel.read(readBuf) > 0) {
            readBuf.flip();
            byte[] bytes = readBuf.array();
            String str = new String(bytes, "UTF-8");
            System.out.println("收到回复:" + str);
        }
    }
}

V2.0 非阻塞版

相对于阻塞版,非阻塞版通过使用多路复用器(Selector)去管理多个通道。看代码:

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ServerV2 {
    public static void main(String[] args) throws IOException {
        // 开启一个多路复用器
        Selector selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(8088));
        // 将 serverSocketChannel 设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 将 serverSocketChannel 注册到Selector中,监听 OP_ACCEPT 事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 轮询
            int channels = selector.select();
            if (channels == 0) {
                continue;
            }
            System.out.println("轮询到有事件");
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 遍历
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey next = iterator.next();
                iterator.remove();
                if (next.isAcceptable()) {
                    System.out.println("有新的连接到来");
                    // 有新连接,将 socketChannel 注册到多路复用器中
                    // 设置非阻塞模式,并监听 OP_READ 事件
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (next.isReadable()) {
                    System.out.println("有连接可读");
                    SocketChannel socketChannel = (SocketChannel) next.channel();
                    ByteBuffer buf = ByteBuffer.allocate(1024);
                    int num = socketChannel.read(buf);
                    if (num > 0) {
                        String msg = new String(buf.array(), "UTF-8").trim();
                        System.out.println("服务端——接收到消息:" + msg);
                        ByteBuffer writeBuf = ByteBuffer.wrap(("服务端已收到请求:" + msg).getBytes());
                        socketChannel.write(writeBuf);
                    } else if (num < 0) {
                        socketChannel.close();
                    }
                }
            }
        }
    }
}

客户端可继续沿用V1中的客户端。

用了多路复用器后,可以只用一个线程来轮询这个 Selector,看看是否有通道是准备好的(有事件)。 当通道准备好可读或可写,然后才去开始真正的读写,这样速度就很快了。完全没有必要给每个通道都起一个线程。

Selector简介

NIO 中 Selector 是对底层操作系统实现的一个抽象,管理通道状态其实都是底层系统实现的,这里简单介绍下在不同系统下的实现。

select:上世纪 80 年代就实现了,它支持注册 FD_SETSIZE(1024) 个 socket,在那个年代肯定是够用的,不过现在嘛,肯定是不行了。

poll:1997 年,出现了 poll 作为 select 的替代者,最大的区别就是,poll 不再限制 socket 数量。

select 和 poll 都有一个共同的问题,那就是它们都只会告诉你有几个通道准备好了,但是不会告诉你具体是哪几个通道。所以,一旦知道有通道准备好以后,自己还是需要进行一次扫描,显然这个不太好,通道少的时候还行,一旦通道的数量是几十万个以上的时候,扫描一次的时间都很可观了,时间复杂度 O(n)。所以,后来才催生了以下实现。

epoll:2002 年随 Linux 内核 2.5.44 发布,epoll 能直接返回具体的准备好的通道,时间复杂度 O(1)。

除了 Linux 中的 epoll,2000 年 FreeBSD 出现了 Kqueue,还有就是,Solaris 中有 /dev/poll。

在Window中,非阻塞IO只能使用select。

V3.0 异步IO版

个人感觉,异步和非阻塞有一个区别是: 非阻塞只是不让你等待在那里啥也不干,如V2.0里所现,有事件了通知你去处理; 而异步的话则更进一步,有事件,ta直接调用回调函数给你处理完了。

JDK1.7后有了异步IO的接口,异步IO提供了两种使用方式:使用Future,或使用回调函数CompletionHandler

1
2
3
4
public interface CompletionHandler<V,A> {
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

服务端

相比非阻塞版本,此处的服务端的ServerSocketChannel变成了AsynchronousServerSocketChannel

相似的,SocketChannelFileChannel的AIO版本也分别加了 Asynchronous 前缀。

主体程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ServerV3 {
    public static void main(String[] args) throws IOException, InterruptedException {
        
        final AsynchronousServerSocketChannel asyncServer = AsynchronousServerSocketChannel.open()
                .bind(new InetSocketAddress(8088));
        // 自己定义了一个 Attachment ,用于传递一些信息
        Attachment attachment = new Attachment();
        attachment.setServer(asyncServer);

        asyncServer.accept(attachment, new CompletionHandler<AsynchronousSocketChannel, Attachment>() {
            public void completed(AsynchronousSocketChannel client, Attachment attachment) {
                try {
                    SocketAddress clientAddr = client.getRemoteAddress();
                    System.out.println("收到连接请求:" + clientAddr);

                    attachment.getServer().accept(attachment, this);

                    Attachment att1 = new Attachment();
                    att1.setServer(asyncServer);
                    att1.setClient(client);
                    att1.setBuffer(ByteBuffer.allocate(1024));
                    att1.setReadMode(true);
                    // 读连接的客户端的数据,此处为了美观期间,回调函数没有用匿名函数
                    client.read(att1.getBuffer(), att1, new ServerChannelHandler());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            public void failed(Throwable exc, Attachment attachment) {
                System.out.println("accept failed!");
            }
        });
        Thread.currentThread().join();
    }
}

回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ServerChannelHandler implements CompletionHandler<Integer, Attachment> {
    public void completed(Integer result, Attachment attachment) {
        if (attachment.isReadMode()) {
            ByteBuffer byteBuf = attachment.getBuffer();
            byteBuf.flip();
            byte[] bytes = byteBuf.array();
            String msg = new String(byteBuf.array()).trim();
            System.out.println("收到客户端请求:" + msg);

            byteBuf.clear();
            byteBuf.put(("服务端反馈:" + msg).getBytes(Charset.forName("UTF-8")));
            attachment.setReadMode(false);
            byteBuf.flip();
            attachment.getClient().write(byteBuf, attachment, this);
        }else{
            // 此处代表往客户端写数据完毕后就关闭了,也可继续转回成读模式
            try {
                attachment.getClient().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void failed(Throwable exc, Attachment attachment) {
        System.out.println("连接断开");
    }
}

附加信息类 Attachment

1
2
3
4
5
6
7
public class Attachment {
    private AsynchronousServerSocketChannel server;
    private AsynchronousSocketChannel client;
    private boolean isReadMode;
    private ByteBuffer buffer;
    // 省略 getter/setter 方法
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ClientV2 {
    public static void main(String[] args) throws Exception {
        AsynchronousSocketChannel asyncClient = AsynchronousSocketChannel.open();
        // 这里用了 future 的方式
        Future<Void> connectFuture = asyncClient.connect(new InetSocketAddress("127.0.0.1", 8088));
        // 在此阻塞直到连接成功
        connectFuture.get();
        Attachment attachment = new Attachment();
        attachment.setClient(asyncClient);
        attachment.setReadMode(false);
        ByteBuffer byteBuf = ByteBuffer.allocate(2048);
        byte[] bytes = "I am async client.".getBytes();
        byteBuf.put(bytes);
        byteBuf.flip();
        attachment.setBuffer(byteBuf);
        asyncClient.write(byteBuf, attachment, new ClientChannelHandler());

        Thread.currentThread().join();
    }
}

Group简介

在代码中并没有涉及到group这个概念,但还是有必要介绍一下,group可以理解为Channel的集合。

在AIO中,AsynchronousChannelGroup(下文简称 ACG)在linux中的实现类是EPollPort,在windows中是Iopc

ACG 内持有fileDescriptorchannel的映射,从epoll返回的事件可以间接的找到fileDescriptor,通过映射找到channel,从而完成io;

ACG 还持有线程池,自动开启,用于异步处理io,执行CompletionHandler

AsynchronousServerSocketChannelsAsynchronousSocketChannels 是属于 group 的。 当我们调用 AsynchronousServerSocketChannelAsynchronousSocketChannelopen() 方法的时候,相应的 channel 就属于默认的 group,这个 group 由 JVM 自动构造并管理。 也可在open()中指定group参数。

如果我们想要配置这个默认的 group,可以在 JVM 启动参数中指定以下系统变量:

  • java.nio.channels.DefaultThreadPool.threadFactory 用于设置 ThreadFactory
    应该是 java.util.concurrent.ThreadFactory 实现类的全限定类名。一旦指定了这个 ThreadFactory 以后,group 中的线程就会使用该类产生。
  • java.nio.channels.DefaultThreadPool.initialSize 用于设置线程池的初始大小

若想使用自己定义的 group,以便其中的线程进行更多的控制,使用以下几个方法即可:

  • ACG#withCachedThreadPool(ExecutorService executor, int initialSize)
  • ACG#withFixedThreadPool(int nThreads, ThreadFactory threadFactory)
  • ACG#withThreadPool(ExecutorService executor)