1. 说明

NIO主要包括 Selector Channel Buffer 三部分

  • Buffer 缓冲区,实际上为一块内存
  • Channel 通道,为数据链接的载体
  • Selector 选择器,NIO主要的协调者,通过对SelectorKey的轮询实现对不同订阅的调用处理

具体详见
Java NIO (图解+秒懂+史上最全) 已经说的很细了
Java面试常考的 BIO,NIO,AIO 总结()

后续主要提供一个简单的演示,实际要复杂的多

2. 缓冲区

以Intbuffer为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void intBuffer(){
//创建缓冲区
IntBuffer intBuffer = IntBuffer.allocate(10);
//写数据
for (int i = 0; i < 10; i++) {
intBuffer.put(i);
}
//翻转 需要注意的是翻转了也可以写的,但是会挨个儿覆盖数据
intBuffer.flip();
//读取数据
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
System.out.println("position: "+intBuffer.position()); // position: 10
//转成写模式
intBuffer.clear();
}

3. 通道

通道与通道的交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 通道之间处理数据
*/
@Test
public void channel() throws IOException {
//创建一个文件输入流
FileInputStream fis = new FileInputStream("D:\\Projects\\kewen-blogs\\.gitignore");
//获取文件流的通道
FileChannel inChannel = fis.getChannel();
//创建一个文件输出流
FileOutputStream fos = new FileOutputStream("D:\\data\\.gitignore");
//获取文件流的通道
FileChannel outChannel = fos.getChannel();
//写流 这里面用了 MappedByteBuffer
inChannel.transferTo(0, inChannel.size(), outChannel);
}

通道与Buffer交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
* 通道和buffer的配置
*/
@Test
public void channelBuffer() throws IOException {
//创建一个文件输入流
FileInputStream fis = new FileInputStream("srcFile");
FileChannel inChannel = fis.getChannel();
//ByteBuffer.allocateDirect(1024);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//inChannel.read(byteBuffer) 通道里的数据需要用Buffer接收
while (inChannel.read(byteBuffer) != -1) {
//转换读模式
byteBuffer.flip();
//......这里直接处理buffer中的数据
//如果要写到另外一个文件,则可以将byteBuffer写入到另一个OutputStream的 FileChannel中
//转换写模式
byteBuffer.clear();
}
}

通道与socket交互

服务端创建对应通道,然后设置非阻塞式,其余的正常的读写,只不过通道的数据都需要buffer来存
客户端也创建对应通道,然后设置非阻塞式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ChannelSocketTest {
/**
* 服务端
*/
@Test
public void server() throws IOException, InterruptedException {
//1. 创建服务器监听Channel并绑定
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress("127.0.0.1", 8080));
//2.获取一个Channel并设置成非阻塞
SocketChannel accept = server.accept();
accept.configureBlocking(false);
//3.模拟读流并使用
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (accept.read(byteBuffer) != -1) {
byteBuffer.flip();
//处理逻辑,这里直接打印了就完事
System.out.println(new String(byteBuffer.array(),byteBuffer.position(),byteBuffer.limit()));
byteBuffer.clear();
Thread.sleep(500); //这里不休眠循环一直拿不到客户端的数据,一直都是空的
}
}

/**
* 客户端
*/
@Test
public void client() throws IOException, InterruptedException {
//1. 创建一个Channel并设置成非阻塞
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
//2. 请求连接的端口,非阻塞这里可能并没有连接上,就需要循环监听
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
//2.1 这里需要检测一下是否已经连接上了,没连接上需要等待继续连接
while (!socketChannel.finishConnect()){
System.out.println("暂时未连接上");
Thread.sleep(500);
}
//3. 发送数据
while (true){
System.out.println("准备发送");
socketChannel.write(ByteBuffer.wrap("hello".getBytes()));
Thread.sleep(500);
}
}
}

4. selector

selector作为选择器,直接在socket工程中演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

public class SelectTest {

@Test
public void selectorServer() throws IOException, InterruptedException {
//主要是添加和检测使用了selector,可以监听多个ServerSocketChannel,

//1. ** 添加selector
Selector selector = Selector.open();
//2. 创建服务器监听Channel并绑定端口
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress("127.0.0.1", 8080));
//3. ** 注册到selector中 ServerSocketChannel 只能注册Accept监听,在Acceptable中完成获取到的SocketChannel的注册
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("注册完成");
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
Thread.sleep(1000);
SelectionKey key = it.next();
if (key.isConnectable()) {
System.out.println("isConnectable--------");
//已连接事件
} else if (key.isAcceptable()) {
//2.获取一个Channel并设置成非阻塞
System.out.println("准备Acceptable--------");
SocketChannel accept = server.accept();
accept.configureBlocking(false);
//接收事件,可以执行 accept(), accept.configureBlocking(false)等
accept.register(selector, SelectionKey.OP_READ);
} else if (key.isWritable()) {
System.out.println("isWritable--------");
SocketChannel channel = (SocketChannel) key.channel();
channel.write(ByteBuffer.wrap("server hello".getBytes()));
} else if (key.isReadable()) {
//这里都是用socketChannel
System.out.println("isReadable--------");
SocketChannel channel = (SocketChannel)key.channel();
//从channel中读数据 channel.read()
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//这里不使用while循环了,直接读取就可
while (channel.read(byteBuffer) > 0) {
channel.read(byteBuffer);
byteBuffer.flip();
//处理逻辑,这里直接打印了就完事
System.out.println(new String(byteBuffer.array(),byteBuffer.position(),byteBuffer.limit()));
byteBuffer.clear();
}
}
it.remove();
}
}
server.close();
}
@Test
public void client1() throws IOException, InterruptedException {
//client("客户端0");
new Thread(()->{client("客户端1");}).start();
new Thread(()->{client("客户端2");}).start();
new Thread(()->{client("客户端3");}).start();
Thread.sleep(50000);
}
@SneakyThrows
private void client(String clientName){
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8080));
socketChannel.configureBlocking(false);
while (!socketChannel.finishConnect()) {
System.out.println("正在连接中......");
}
int i=0;
while (true){
socketChannel.write(ByteBuffer.wrap((clientName+": hello"+i++).getBytes()));
Thread.sleep(5000);
}
}

}