养生
accept函数(看我如何把NIO拉下神坛)

1. 传统的阻塞式I/O


因为socket的accept函数,read函数,write函数是同步阻塞的,所以主线程不断调用socket的accept函数,轮询状态是established的TCP连接。

当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。

阻塞式I/O模型


阻塞式I/O的缺点

缺乏扩展性,严重依赖线程。Java的线程占用内存在512K-1M,线程数量过多会导致JVM内存溢出。大量的线程上下文切换严重消耗CPU性能。大量的I/O线程被激活会导致系统锯齿状负载。

2. NIO编程

同步非阻塞I/O模型


对于NIO来说,如果内核缓冲区中没有数据就直接返回一个EWOULDBLOCK错误,一般来说进程可以轮询调用read函数,当缓冲区中有数据的时候将数据复制到用户空间,而不用挂起线程。

在linux系统中,可以使用select/poll/epoll使用一个线程监控多个socket,只要有一个socket的读缓存有数据了,方法就立即返回,然后你就可以去读这个可读的socket了,如果所有的socket读缓存都是空的,则会阻塞,也就是将线程挂起。

NIO实际上是一个事件驱动的模型,NIO中最重要的就是多路复用器(Selector)。在NIO中它提供了选择就绪事件的能力,我们只需要把通道(Channel) 注册到Selector上,Selector就会通过select方法(实际上操作系统是通过epoll)不断轮询注册在其上的Channel,如果某个Channel上发生了读就绪、写就绪或者连接到来就会被Selector轮询出来,然后通过SelectionKey(Channel注册到Selector上时会返回和其绑定的SelectionKey)可以获取到已经就绪的Channel集合,否则Selector就会阻塞在select方法上。

多路复用的核心目的就是使用最少的线程去操作更多的通道,在其内部并不是只有一个线程。创建线程的个数是根据通道的数量来决定的,每注册1023个通道就创建1个新的线程。

不同类型通道支持的事件

NIO事件模型示意图

ServerReactor

@Slf4jpublic class ServerReactor implements Runnable {    private final Selector selector;    private final ServerSocketChannel serverSocketChannel;    private volatile boolean stop = false;    public ServerReactor(int port, int backlog) throws IOException {        selector = Selector.open();        serverSocketChannel = ServerSocketChannel.open();        ServerSocket serverSocket = serverSocketChannel.socket();        serverSocket.bind(new InetSocketAddress(port), backlog);        serverSocket.setReuseAddress(true);        serverSocketChannel.configureBlocking(false);        // 将channel注册到多路复用器上,并监听ACCEPT事件        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);    }    public void setStop(boolean stop) {        this.stop = stop;    }    @Override    public void run() {        try {            // 无限的接收客户端连接            while (!stop && !Thread.interrupted()) {                int num = selector.select();                Set<SelectionKey> selectionKeys = selector.selectedKeys();                Iterator<SelectionKey> it = selectionKeys.iterator();                while (it.hasNext()) {                    SelectionKey key = it.next();                    // 移除key,否则会导致事件重复消费                    it.remove();                    try {                        handle(key);                    } catch (Exception e) {                        if (key != null) {                            key.cancel();                            if (key.channel() != null) {                                key.channel().close();                            }                        }                    }                }            }        } catch (IOException e) {            e.printStackTrace();        }        if (selector != null) {            try {                selector.close();            } catch (IOException e) {                e.printStackTrace();            }        }    }    private void handle(SelectionKey key) throws Exception {        if (key.isValid()) {            // 如果是ACCEPT事件,代表是一个新的连接请求            if (key.isAcceptable()) {                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();                // 相当于三次握手后,从全连接队列中获取可用的连接                // 必须使用accept方法消费ACCEPT事件,否则将导致多路复用器死循环                SocketChannel socketChannel = serverSocketChannel.accept();                // 设置为非阻塞模式,当没有可用的连接时直接返回null,而不是阻塞。                socketChannel.configureBlocking(false);                socketChannel.register(selector, SelectionKey.OP_READ);            }            if (key.isReadable()) {                SocketChannel socketChannel = (SocketChannel) key.channel();                ByteBuffer readBuffer = ByteBuffer.allocate(1024);                int readBytes = socketChannel.read(readBuffer);                if (readBytes > 0) {                    readBuffer.flip();                    byte[] bytes = new byte[readBuffer.remaining()];                    readBuffer.get(bytes);                    String content = new String(bytes);                    System.out.println("recv client content: " + content);                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);                    writeBuffer.put(("服务端已收到: " + content).getBytes());                    writeBuffer.flip();                    socketChannel.write(writeBuffer);                } else if (readBytes < 0) {                    key.cancel();                    socketChannel.close();                }            }        }    }}

ClientReactor

public class ClientReactor implements Runnable {    final String host;    final int port;    final SocketChannel socketChannel;    final Selector selector;    private volatile boolean stop = false;    public ClientReactor(String host, int port) throws IOException {        this.socketChannel = SocketChannel.open();        this.socketChannel.configureBlocking(false);        Socket socket = this.socketChannel.socket();        socket.setTcpNoDelay(true);        this.selector = Selector.open();        this.host = host;        this.port = port;    }    @Override    public void run() {        try {            // 如果通道呈阻塞模式,则立即发起连接;            // 如果呈非阻塞模式,则不是立即发起连接,而是在随后的某个时间才发起连接。            // 如果连接是立即建立的,说明通道是阻塞模式,当连接成功时,则此方法返回true,连接失败出现异常。            // 如果此通道处于阻塞模式,则此方法的调用将会阻塞,直到建立连接或发生I/O错误。            // 如果连接不是立即建立的,说明通道是非阻塞模式,则此方法返回false,            // 并且以后必须通过调用finishConnect()方法来验证连接是否完成            // socketChannel.isConnectionPending()判断此通道是否正在进行连接            if (socketChannel.connect(new InetSocketAddress(host, port))) {                socketChannel.register(selector, SelectionKey.OP_READ);                doWrite(socketChannel);            } else {                socketChannel.register(selector, SelectionKey.OP_CONNECT);            }            while (!stop && !Thread.interrupted()) {                int num = selector.select();                Set<SelectionKey> selectionKeys = selector.selectedKeys();                Iterator<SelectionKey> it = selectionKeys.iterator();                while (it.hasNext()) {                    SelectionKey key = it.next();                    // 移除key,否则会导致事件重复消费                    it.remove();                    try {                        handle(key);                    } catch (Exception e) {                        if (key != null) {                            key.cancel();                            if (key.channel() != null) {                                key.channel().close();                            }                        }                    }                }            }        } catch (IOException e) {            e.printStackTrace();        }        if (selector != null) {            try {                selector.close();            } catch (IOException e) {                e.printStackTrace();            }        }    }    private void handle(SelectionKey key) throws IOException {        if (key.isValid()) {            SocketChannel socketChannel = (SocketChannel) key.channel();            if (key.isConnectable()) {                if (socketChannel.finishConnect()) {                    socketChannel.register(selector, SelectionKey.OP_READ);                    doWrite(socketChannel);                }            }            if (key.isReadable()) {                ByteBuffer readBuffer = ByteBuffer.allocate(1024);                int readBytes = socketChannel.read(readBuffer);                if (readBytes > 0) {                    readBuffer.flip();                    byte[] bytes = new byte[readBuffer.remaining()];                    readBuffer.get(bytes);                    System.out.println("recv server content: " + new String(bytes));                } else if (readBytes < 0) {                    key.cancel();                    socketChannel.close();                }            }        }    }    private void doWrite(SocketChannel socketChannel) {        Scanner scanner = new Scanner(System.in);        new Thread(() -> {            while (scanner.hasNext()) {                try {                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);                    writeBuffer.put(scanner.nextLine().getBytes());                    writeBuffer.flip();                    socketChannel.write(writeBuffer);                } catch (Exception e) {                }            }        }).start();    }}

作者:克里斯朵夫李维

链接:https://juejin.im/post/5dfae986518825122671c846


顶一下()     踩一下()

热门推荐

发表评论
0评