1. 传统的阻塞式I/O
因为socket的accept函数,read函数,write函数是同步阻塞的,所以主线程不断调用socket的accept函数,轮询状态是established的TCP连接。
当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。
阻塞式I/O模型
阻塞式I/O的缺点
缺乏扩展性,严重依赖线程。Java的线程占用内存在512K-1M,线程数量过多会导致JVM内存溢出。大量的线程上下文切换严重消耗CPU性能。大量的I/O线程被激活会导致系统锯齿状负载。
2. NIO编程
同步非阻塞I/O模型
对于NIO来说,如果内核缓冲区中没有数据就直接返回一个EWOULDBLOCK错误,一般来说进程可以轮询调用read函数,当缓冲区中有数据的时候将数据复制到用户空间,而不用挂起线程。
在linux系统中,可以使用select/poll/epoll使用一个线程监控多个socket,只要有一个socket的读缓存有数据了,方法就立即返回,然后你就可以去读这个可读的socket了,如果所有的socket读缓存都是空的,则会阻塞,也就是将线程挂起。
NIO实际上是一个事件驱动的模型,NIO中最重要的就是多路复用器(Selector)。在NIO中它提供了选择就绪事件的能力,我们只需要把通道(Channel) 注册到Selector上,Selector就会通过select方法(实际上操作系统是通过epoll)不断轮询注册在其上的Channel,如果某个Channel上发生了读就绪、写就绪或者连接到来就会被Selector轮询出来,然后通过SelectionKey(Channel注册到Selector上时会返回和其绑定的SelectionKey)可以获取到已经就绪的Channel集合,否则Selector就会阻塞在select方法上。
多路复用的核心目的就是使用最少的线程去操作更多的通道,在其内部并不是只有一个线程。创建线程的个数是根据通道的数量来决定的,每注册1023个通道就创建1个新的线程。
不同类型通道支持的事件
NIO事件模型示意图:
ServerReactor
@Slf4jpublic class ServerReactor implements Runnable { private final Selector selector; private final ServerSocketChannel serverSocketChannel; private volatile boolean stop = false; public ServerReactor(int port, int backlog) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(new InetSocketAddress(port), backlog); serverSocket.setReuseAddress(true); serverSocketChannel.configureBlocking(false); // 将channel注册到多路复用器上,并监听ACCEPT事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } public void setStop(boolean stop) { this.stop = stop; } @Override public void run() { try { // 无限的接收客户端连接 while (!stop && !Thread.interrupted()) { int num = selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); // 移除key,否则会导致事件重复消费 it.remove(); try { handle(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } } catch (IOException e) { e.printStackTrace(); } if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handle(SelectionKey key) throws Exception { if (key.isValid()) { // 如果是ACCEPT事件,代表是一个新的连接请求 if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); // 相当于三次握手后,从全连接队列中获取可用的连接 // 必须使用accept方法消费ACCEPT事件,否则将导致多路复用器死循环 SocketChannel socketChannel = serverSocketChannel.accept(); // 设置为非阻塞模式,当没有可用的连接时直接返回null,而不是阻塞。 socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = socketChannel.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String content = new String(bytes); System.out.println("recv client content: " + content); ByteBuffer writeBuffer = ByteBuffer.allocate(1024); writeBuffer.put(("服务端已收到: " + content).getBytes()); writeBuffer.flip(); socketChannel.write(writeBuffer); } else if (readBytes < 0) { key.cancel(); socketChannel.close(); } } } }}ClientReactor
public class ClientReactor implements Runnable { final String host; final int port; final SocketChannel socketChannel; final Selector selector; private volatile boolean stop = false; public ClientReactor(String host, int port) throws IOException { this.socketChannel = SocketChannel.open(); this.socketChannel.configureBlocking(false); Socket socket = this.socketChannel.socket(); socket.setTcpNoDelay(true); this.selector = Selector.open(); this.host = host; this.port = port; } @Override public void run() { try { // 如果通道呈阻塞模式,则立即发起连接; // 如果呈非阻塞模式,则不是立即发起连接,而是在随后的某个时间才发起连接。 // 如果连接是立即建立的,说明通道是阻塞模式,当连接成功时,则此方法返回true,连接失败出现异常。 // 如果此通道处于阻塞模式,则此方法的调用将会阻塞,直到建立连接或发生I/O错误。 // 如果连接不是立即建立的,说明通道是非阻塞模式,则此方法返回false, // 并且以后必须通过调用finishConnect()方法来验证连接是否完成 // socketChannel.isConnectionPending()判断此通道是否正在进行连接 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); } while (!stop && !Thread.interrupted()) { int num = selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); // 移除key,否则会导致事件重复消费 it.remove(); try { handle(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } } catch (IOException e) { e.printStackTrace(); } if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handle(SelectionKey key) throws IOException { if (key.isValid()) { SocketChannel socketChannel = (SocketChannel) key.channel(); if (key.isConnectable()) { if (socketChannel.finishConnect()) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = socketChannel.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); System.out.println("recv server content: " + new String(bytes)); } else if (readBytes < 0) { key.cancel(); socketChannel.close(); } } } } private void doWrite(SocketChannel socketChannel) { Scanner scanner = new Scanner(System.in); new Thread(() -> { while (scanner.hasNext()) { try { ByteBuffer writeBuffer = ByteBuffer.allocate(1024); writeBuffer.put(scanner.nextLine().getBytes()); writeBuffer.flip(); socketChannel.write(writeBuffer); } catch (Exception e) { } } }).start(); }}作者:克里斯朵夫李维
链接:https://juejin.im/post/5dfae986518825122671c846
