1. 说明
NIO主要包括 Selector Channel Buffer 三部分
- Buffer 缓冲区,实际上为一块内存
- Channel 通道,为数据链接的载体
- Selector 选择器,NIO主要的协调者,通过对SelectorKey的轮询实现对不同订阅的调用处理
具体详见
Java NIO (图解+秒懂+史上最全) 已经说的很细了
Java面试常考的 BIO,NIO,AIO 总结()
后续主要提供一个简单的演示,实际要复杂的多
2. 缓冲区
以Intbuffer为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Test public void intBuffer(){ IntBuffer intBuffer = IntBuffer.allocate(10); for (int i = 0; i < 10; i++) { intBuffer.put(i); } intBuffer.flip(); while (intBuffer.hasRemaining()) { System.out.println(intBuffer.get()); } System.out.println("position: "+intBuffer.position()); intBuffer.clear(); }
|
3. 通道
通道与通道的交互
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Test public void channel() throws IOException { FileInputStream fis = new FileInputStream("D:\\Projects\\kewen-blogs\\.gitignore"); FileChannel inChannel = fis.getChannel(); FileOutputStream fos = new FileOutputStream("D:\\data\\.gitignore"); FileChannel outChannel = fos.getChannel(); inChannel.transferTo(0, inChannel.size(), outChannel); }
|
通道与Buffer交互
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@Test public void channelBuffer() throws IOException { FileInputStream fis = new FileInputStream("srcFile"); FileChannel inChannel = fis.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while (inChannel.read(byteBuffer) != -1) { byteBuffer.flip(); byteBuffer.clear(); } }
|
通道与socket交互
服务端创建对应通道,然后设置非阻塞式,其余的正常的读写,只不过通道的数据都需要buffer来存
客户端也创建对应通道,然后设置非阻塞式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class ChannelSocketTest {
@Test public void server() throws IOException, InterruptedException { ServerSocketChannel server = ServerSocketChannel.open(); server.bind(new InetSocketAddress("127.0.0.1", 8080)); SocketChannel accept = server.accept(); accept.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while (accept.read(byteBuffer) != -1) { byteBuffer.flip(); System.out.println(new String(byteBuffer.array(),byteBuffer.position(),byteBuffer.limit())); byteBuffer.clear(); Thread.sleep(500); } }
@Test public void client() throws IOException, InterruptedException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080)); while (!socketChannel.finishConnect()){ System.out.println("暂时未连接上"); Thread.sleep(500); } while (true){ System.out.println("准备发送"); socketChannel.write(ByteBuffer.wrap("hello".getBytes())); Thread.sleep(500); } } }
|
4. selector
selector作为选择器,直接在socket工程中演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| public class SelectTest {
@Test public void selectorServer() throws IOException, InterruptedException {
Selector selector = Selector.open(); ServerSocketChannel server = ServerSocketChannel.open(); server.bind(new InetSocketAddress("127.0.0.1", 8080)); server.configureBlocking(false); server.register(selector, SelectionKey.OP_ACCEPT); System.out.println("注册完成"); while (selector.select() > 0) { Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { Thread.sleep(1000); SelectionKey key = it.next(); if (key.isConnectable()) { System.out.println("isConnectable--------"); } else if (key.isAcceptable()) { System.out.println("准备Acceptable--------"); SocketChannel accept = server.accept(); accept.configureBlocking(false); accept.register(selector, SelectionKey.OP_READ); } else if (key.isWritable()) { System.out.println("isWritable--------"); SocketChannel channel = (SocketChannel) key.channel(); channel.write(ByteBuffer.wrap("server hello".getBytes())); } else if (key.isReadable()) { System.out.println("isReadable--------"); SocketChannel channel = (SocketChannel)key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while (channel.read(byteBuffer) > 0) { channel.read(byteBuffer); byteBuffer.flip(); System.out.println(new String(byteBuffer.array(),byteBuffer.position(),byteBuffer.limit())); byteBuffer.clear(); } } it.remove(); } } server.close(); } @Test public void client1() throws IOException, InterruptedException { new Thread(()->{client("客户端1");}).start(); new Thread(()->{client("客户端2");}).start(); new Thread(()->{client("客户端3");}).start(); Thread.sleep(50000); } @SneakyThrows private void client(String clientName){ SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8080)); socketChannel.configureBlocking(false); while (!socketChannel.finishConnect()) { System.out.println("正在连接中......"); } int i=0; while (true){ socketChannel.write(ByteBuffer.wrap((clientName+": hello"+i++).getBytes())); Thread.sleep(5000); } }
}
|