概念

Netty 是基于 Java NIO 的异步事件驱动的网络应用框架,使用 Netty 可以快速开发网络应用,Netty 提供了高层次的抽象来简化 TCP 和 UDP 服务器的编程,但是你仍然可以使用底层的 API。

Netty 的内部实现是很复杂的,但是 Netty 提供了简单易用的API从网络处理代码中解耦业务逻辑。Netty 是完全基于 NIO 实现的,所以整个 Netty 都是异步的。

Netty 是最流行的 NIO 框架,它已经得到成百上千的商业、商用项目验证,许多框架和开源组件的底层 rpc 都是使用的 Netty,如 Dubbo、Elasticsearch 等等。下面是官网给出的一些 Netty 的特性:

设计方面

对各种传输协议提供统一的 API(使用阻塞和非阻塞套接字时候使用的是同一个 API,只是需要设置的参数不一样)。
基于一个灵活、可扩展的事件模型来实现关注点清晰分离。
高度可定制的线程模型——单线程、一个或多个线程池。
真正的无数据报套接字(UDP)的支持(since 3.1)。
易用性

完善的 Javadoc 文档和示例代码。
不需要额外的依赖,JDK 5 (Netty 3.x) 或者 JDK 6 (Netty 4.x) 已经足够。
性能

更好的吞吐量,更低的等待延迟。
更少的资源消耗。
最小化不必要的内存拷贝。
安全性

完整的 SSL/TLS 和 StartTLS 支持
对于初学者,上面的特性我们在脑中有个简单了解和印象即可, 下面开始我们的实战部分。

引用

工程搭建

简单的入门工程

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class D10NettyApp {
public static void main(String[] args) {
int availabledProcessors = NettyRuntime.availableProcessors();
System.out.println("Available processors: " + availabledProcessors);
//构造两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//服务端启动辅助类
ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new KHttpServerInitializer()); //初始化器,自己定义初始化器的内容
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("启动完成");
//等待服务端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

前面需要初始化器,初始化器中添加相关的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

public class KHttpServerInitializer extends ChannelInitializer<SocketChannel> {

protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
//处理http消息的编解码,netty实现了编码器,但是必须得自己带上
pipeline.addLast( new HttpServerCodec());

//添加自定义的ChannelHandler,主要是添加处理等相关的,一般调用一次
pipeline.addLast(new ChannelHandler() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println(Thread.currentThread().getName()+": handlerAdded");
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println(Thread.currentThread().getName()+":handlerRemoved");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(Thread.currentThread().getName()+":exceptionCaught");
}
});

//常用的处理器类型,使用SimpleChannelInboundHandler不用关系资源释放问题
//需要注意的是泛型的匹配,只有数据格式匹配的泛型才会走这个处理器否则会跳过,这里已经写流了,这里走了之后下一个fullHttpRequestServerChannelHandler就不走了
pipeline.addLast(new SimpleChannelInboundHandler<HttpObject>() {

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
//根据不同的请求类型执行对应操作
System.out.println(msg.getClass().getName());
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
request.method();
String uri = request.uri();
System.out.println("Uri: " + uri);
}
if (msg instanceof HttpContent) {

HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
System.out.println("ByteBuf: "+buf.toString(io.netty.util.CharsetUtil.UTF_8));
ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
response.headers().add(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().add(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());

ctx.writeAndFlush(response);
}
}
});

//搭配起来使用,将请求和返回合并到FullHttpRequest里面去,不要HttpObjectAggregator的话FullHttpRequestServerChannelHandler无数据
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
//有了上一个组合的aggregator,就可以使用FullHttpRequest了,包含了请求和响应
pipeline.addLast("fullHttpRequestServerChannelHandler", new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
ctx.channel().remoteAddress();
//解析请求
System.out.println("请求方法名称:" + request.method().name());
System.out.println("uri:" + request.uri());
ByteBuf buf = request.content();
System.out.print(buf.toString(CharsetUtil.UTF_8));

//写流数据
ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
response.headers().add(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().add(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());

ctx.writeAndFlush(response);
}
});
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class D10NettyClientApp {

public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port = 8080;

EventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new SimpleChannelInboundHandler<FullHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
//读取服务器返回来的数据
FullHttpResponse response = msg;
response.headers().get(HttpHeaderNames.CONTENT_TYPE);
ByteBuf buf = response.content();
System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//建立连接是调用,向服务器发送数据
URI uri = new URI("http://127.0.0.1:8080");
String msg = "Are you ok?";
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));

// 构建http请求
// request.headers().set(HttpHeaderNames.HOST, "127.0.0.1");
// request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
// 发送http请求
ctx.channel().writeAndFlush(request);
}
});
}
});

// 启动客户端.
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();

} finally {
group.shutdownGracefully();
}
}
}

SpringBoot工程

  1. 配置 ServerBootstrap ,和普通项目差不多的方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    @Configuration
    public class NettyConfig {

    /**
    *
    *5. 性能优化
    *性能优化可以包括多个方面,以下是一些常见的优化策略:
    *线程池调整:根据实际负载调整 EventLoopGroup 的线程数。可以通过 NioEventLoopGroup 的构造函数指定线程数。
    *EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    *EventLoopGroup workerGroup = new NioEventLoopGroup();
    *TCP 参数调整:配置 TCP 相关参数,如 SO_BACKLOG、SO_KEEPALIVE 等,以提高连接的稳定性和性能。
    *内存管理:优化 Netty 的内存管理,如设置适当的缓冲区大小,减少内存复制操作。
    *日志记录:调整 Netty 的日志级别。生产环境中建议将日志级别设置为 WARN 或 ERROR 以减少日志对性能的影响。
    *资源释放:确保处理器中对资源的释放,如使用 try-finally 语句释放通道和其他资源。
    *异步处理:尽可能使用异步操作来避免阻塞,提高处理效率。
    *将 Spring Boot 与 Netty 结合使用,可以利用 Spring Boot 的强大特性与 Netty 的高性能网络能力。
    * 通过合理配置和优化,可以构建高效、可靠的网络应用。以上代码示例和优化策略是一个起点,具体的优化措施可以根据实际业务需求进行调整。
    *
    */
    @Bean
    public ServerBootstrap serverBootstrap() throws UnknownHostException, InterruptedException {
    //NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
    //factory.setAddress(InetAddress.getByName("127.0.0.1"));

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(new NioEventLoopGroup(),new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG))
    .addLast(new HttpServerCodec())
    .addLast(new HttpObjectAggregator(65535))
    .addLast(new NettyServerHandler());
    }
    })
    .option(ChannelOption.SO_BACKLOG, 128)
    .childOption(ChannelOption.SO_KEEPALIVE,true);
    ChannelFuture future = serverBootstrap.bind(8080)
    .sync()
    .addListener(future1 -> {
    if (future1.isSuccess()) {
    System.out.println("Netty server started on port 8080");
    } else {
    System.err.println("Failed to start Netty server");
    future1.cause().printStackTrace();
    }
    });
    future.channel().closeFuture().sync();

    return serverBootstrap;


    }
    }

  2. 创建请求响应处理类,这里执行扩展

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class NettyServerHandler extends SimpleChannelInboundHandler<HttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
    DefaultFullHttpResponse response = new DefaultFullHttpResponse(
    HttpVersion.HTTP_1_1,
    HttpResponseStatus.OK,
    Unpooled.copiedBuffer("Hello from Netty!", CharsetUtil.UTF_8)
    );

    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
    response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
    response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

    boolean keepAlive = HttpUtil.isKeepAlive(msg);
    if (!keepAlive) {
    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    } else {
    ctx.writeAndFlush(response);
    }
    }
    }
  3. 启动SpringBoot

    1
    2
    3
    4
    5
    6
    @SpringBootApplication
    public class D100NettySpringBootApp {
    public static void main(String[] args) {
    SpringApplication.run(D100NettySpringBootApp.class,args);
    }
    }