概念
Netty 是基于 Java NIO 的异步事件驱动的网络应用框架,使用 Netty 可以快速开发网络应用,Netty 提供了高层次的抽象来简化 TCP 和 UDP 服务器的编程,但是你仍然可以使用底层的 API。
Netty 的内部实现是很复杂的,但是 Netty 提供了简单易用的API从网络处理代码中解耦业务逻辑。Netty 是完全基于 NIO 实现的,所以整个 Netty 都是异步的。
Netty 是最流行的 NIO 框架,它已经得到成百上千的商业、商用项目验证,许多框架和开源组件的底层 rpc 都是使用的 Netty,如 Dubbo、Elasticsearch 等等。下面是官网给出的一些 Netty 的特性:
设计方面
对各种传输协议提供统一的 API(使用阻塞和非阻塞套接字时候使用的是同一个 API,只是需要设置的参数不一样)。
基于一个灵活、可扩展的事件模型来实现关注点清晰分离。
高度可定制的线程模型——单线程、一个或多个线程池。
真正的无数据报套接字(UDP)的支持(since 3.1)。
易用性
完善的 Javadoc 文档和示例代码。
不需要额外的依赖,JDK 5 (Netty 3.x) 或者 JDK 6 (Netty 4.x) 已经足够。
性能
更好的吞吐量,更低的等待延迟。
更少的资源消耗。
最小化不必要的内存拷贝。
安全性
完整的 SSL/TLS 和 StartTLS 支持
对于初学者,上面的特性我们在脑中有个简单了解和印象即可, 下面开始我们的实战部分。
引用
工程搭建
简单的入门工程
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class D10NettyApp { public static void main(String[] args) { int availabledProcessors = NettyRuntime.availableProcessors(); System.out.println("Available processors: " + availabledProcessors); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new KHttpServerInitializer()); ChannelFuture future = bootstrap.bind(8080).sync(); System.out.println("启动完成"); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
|
前面需要初始化器,初始化器中添加相关的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| public class KHttpServerInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel sc) throws Exception { ChannelPipeline pipeline = sc.pipeline(); pipeline.addLast( new HttpServerCodec());
pipeline.addLast(new ChannelHandler() { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println(Thread.currentThread().getName()+": handlerAdded"); }
@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println(Thread.currentThread().getName()+":handlerRemoved"); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(Thread.currentThread().getName()+":exceptionCaught"); } });
pipeline.addLast(new SimpleChannelInboundHandler<HttpObject>() {
@Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { System.out.println(msg.getClass().getName()); if (msg instanceof HttpRequest) { HttpRequest request = (HttpRequest) msg; request.method(); String uri = request.uri(); System.out.println("Uri: " + uri); } if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg; ByteBuf buf = content.content(); System.out.println("ByteBuf: "+buf.toString(io.netty.util.CharsetUtil.UTF_8)); ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); response.headers().add(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().add(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
ctx.writeAndFlush(response); } } });
pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); pipeline.addLast("fullHttpRequestServerChannelHandler", new SimpleChannelInboundHandler<FullHttpRequest>() { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { ctx.channel().remoteAddress(); System.out.println("请求方法名称:" + request.method().name()); System.out.println("uri:" + request.uri()); ByteBuf buf = request.content(); System.out.print(buf.toString(CharsetUtil.UTF_8));
ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); response.headers().add(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().add(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
ctx.writeAndFlush(response); } }); } }
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class D10NettyClientApp {
public static void main(String[] args) throws Exception { String host = "127.0.0.1"; int port = 8080;
EventLoopGroup group = new NioEventLoopGroup();
try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new SimpleChannelInboundHandler<FullHttpResponse>() { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { FullHttpResponse response = msg; response.headers().get(HttpHeaderNames.CONTENT_TYPE); ByteBuf buf = response.content(); System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8)); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { URI uri = new URI("http://127.0.0.1:8080"); String msg = "Are you ok?"; FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()); ctx.channel().writeAndFlush(request); } }); } });
ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync();
} finally { group.shutdownGracefully(); } } }
|
SpringBoot工程
配置 ServerBootstrap ,和普通项目差不多的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Configuration public class NettyConfig {
@Bean public ServerBootstrap serverBootstrap() throws UnknownHostException, InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(new NioEventLoopGroup(),new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)) .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(65535)) .addLast(new NettyServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE,true); ChannelFuture future = serverBootstrap.bind(8080) .sync() .addListener(future1 -> { if (future1.isSuccess()) { System.out.println("Netty server started on port 8080"); } else { System.err.println("Failed to start Netty server"); future1.cause().printStackTrace(); } }); future.channel().closeFuture().sync();
return serverBootstrap;
} }
|
创建请求响应处理类,这里执行扩展
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class NettyServerHandler extends SimpleChannelInboundHandler<HttpRequest> { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { DefaultFullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("Hello from Netty!", CharsetUtil.UTF_8) );
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
boolean keepAlive = HttpUtil.isKeepAlive(msg); if (!keepAlive) { ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } else { ctx.writeAndFlush(response); } } }
|
启动SpringBoot
1 2 3 4 5 6
| @SpringBootApplication public class D100NettySpringBootApp { public static void main(String[] args) { SpringApplication.run(D100NettySpringBootApp.class,args); } }
|