首页>>后端>>java->Netty 系列(2) — Netty 入门

Netty 系列(2) — Netty 入门

时间:2023-12-06 本站 点击:0

Netty 基础

Netty 简介

如果基于原生的 NIO 进行网络编程,有很多问题都需要自己进行大量的编程来实现,比如连接异常、网络闪断、粘包拆包、半包读写、网络拥塞、异常码流、请求排队等等。开发者需要深入掌握线程、IO、网络等相关概念,很容易导致代码复杂、晦涩,难以快速地写出高可靠性的实现。Java 自身的 NIO 设计更偏底层,其复杂性、扩展性等方面,存在一定的局限性。

在基础 NIO 之上,Netty 通过精巧设计的事件机制,将业务逻辑和无关技术逻辑进行隔离,封装了很多复杂的网络通信细节,提供了相对十分简单易用的 API,非常适合网络编程,开发者可以基于它开发出非常复杂的网络通信程序。

Netty 是一个异步的、基于事件驱动的 Client/Server 的网络应用程序框架,用以快速开发高吞吐量、低延时、高可靠性的网络服务器和客户端程序。Netty 是业界最流行的一个Java开源 NIO 框架 之一,广泛应用于各种分布式、即时通信和中间件中,例如 Dubbo、Elasticsearch、RocketMQ 等。

Netty 模块

从 Netty 官方的模块划分来看,主要包含三大模块:

Core:核心模块,包括零拷贝、API库、可扩展的事件模型等。

Transport Services:传输服务,包括 Socket、Datagram、HTTP Tunnel 等。

Protocol Support:协议支持,包括 HTTP、WebSocket、SSL、Google Protobuf、zlib/gzip 压缩与解压缩、Large File Transfer 大文件传输等等。

Netty 完全是 Java NIO 框架的一个超集,除了核心的事件机制等,Netty 还额外提供了很多功能,例如:

Netty 除了支持传输层的 UDP、TCP、SCTP协议,也支持 HTTP(s)、WebSocket 等多种应用层协议,它并不是单一协议的 API。

Netty 提供了一系列扩展的编解码框架,与应用开发场景无缝衔接,并且性能良好。

扩展了 Java NIO Buffer,提供了自己的 ByteBuf 实现,并且深度支持 Direct Buffer 等技术。

版本说明

本系列文章使用的 Netty 版本为 4.1.76.Final

Maven 依赖如下:

<dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>    <version>4.1.76.Final</version></dependency>

Netty Github:https://github.com/netty/netty

参考书籍:《Netty 权威指南》

Netty 核心组件

Demo 程序

下面先看下用 Netty 写的一个客户端/服务端网络通信程序,代码注释说明了每行代码的含义。

Netty 服务端

package com.lyyzoo.netty.netty;import java.nio.charset.StandardCharsets;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {    private final int port;    public NettyServer(int port) {        this.port = port;    }    /**     * 启动 Netty Server     */    public void start() {        // 负责和客户端建立网络连接的线程组        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        // 负责处理网络IO请求读取和处理的线程组        EventLoopGroup workerGroup = new NioEventLoopGroup(32);        try {            // Netty网络服务器(服务端启动类)            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap                    // 设置线程组                    .group(bossGroup, workerGroup)                    // 网络通信通道,负责监听指定的端口                    .channel(NioServerSocketChannel.class)                    // 网络通道处理器初始化                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel socketChannel) throws Exception {                            // SocketChannel 的处理管道                            ChannelPipeline channelPipeline = socketChannel.pipeline();                            // 添加一些处理器                            channelPipeline                                    // 自定义的服务端处理器                                    .addLast(new NettyServerHandler());                        }                    })                    // 设置全连接队列大小                    .option(ChannelOption.SO_BACKLOG, 1024)                    // 保持网络连接                    .childOption(ChannelOption.SO_KEEPALIVE, true)            ;            // 绑定要监听的端口,sync() 同步等待完成            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();            // 对关闭通道进行监听            channelFuture.channel().closeFuture().sync();        } catch (Exception e) {            e.printStackTrace();        } finally {            // 释放资源            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    static class NettyServerHandler extends ChannelInboundHandlerAdapter {        @Override        public void channelActive(ChannelHandlerContext ctx) throws Exception {            System.out.println("channel active...");        }        @Override        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {            System.out.println("channel read...");            // 读数据并处理请求            ByteBuf reqBuf = (ByteBuf) msg;            byte[] reqBytes = new byte[reqBuf.readableBytes()];            reqBuf.readBytes(reqBytes);            System.out.println("Request data: " + new String(reqBytes, StandardCharsets.UTF_8));            // 响应客户端请求            System.out.println("channel writing...");            ctx.channel().write(Unpooled.copiedBuffer("Hello Netty Client!", StandardCharsets.UTF_8));            ctx.channel().write(Unpooled.copiedBuffer("Hello World!!!", StandardCharsets.UTF_8));            ctx.channel().flush();        }        @Override        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {            System.out.println("channel read complete...");            ctx.flush();        }        @Override        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {            System.out.println("channel exception...");            cause.printStackTrace();            ctx.close();        }    }    public static void main(String[] args) {        // 启动 Netty Server        NettyServer nettyServer = new NettyServer(9000);        nettyServer.start();    }}

Netty 客户端

package com.lyyzoo.netty.netty;import java.nio.charset.StandardCharsets;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {    public static void main(String[] args) {        // 线程组        EventLoopGroup group = new NioEventLoopGroup(1);        try {            // 客户端启动类入口            Bootstrap bootstrap = new Bootstrap();            bootstrap                    .group(group)                    // 设置管道                    .channel(NioSocketChannel.class)                    .option(ChannelOption.TCP_NODELAY, true)                    // 管道初始化器                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel socketChannel) throws Exception {                            // SocketChannel 的处理管道                            ChannelPipeline channelPipeline = socketChannel.pipeline();                            // 添加处理器                            channelPipeline                                    // 自定义的客户端处理器                                    .addLast(new NettyClientHandler());                        }                    });            // connect() 发起异步连接,sync() 同步等待连接成功            ChannelFuture channelFuture = bootstrap.connect("localhost", 9000).sync();            // 等待客户端连接关闭            channelFuture.channel().closeFuture().sync();            System.out.println("Client close...");        } catch (Exception e) {            e.printStackTrace();        } finally {            // 释放资源            group.shutdownGracefully();        }    }    static class NettyClientHandler extends ChannelInboundHandlerAdapter {        @Override        public void channelActive(ChannelHandlerContext ctx) throws Exception {            System.out.println("channel active...");            ctx.channel().write(Unpooled.copiedBuffer("Hello Netty Server!", StandardCharsets.UTF_8));            ctx.channel().write(Unpooled.copiedBuffer("Hello World!!!", StandardCharsets.UTF_8));            ctx.channel().flush();        }        @Override        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {            System.out.println("channel read...");            ByteBuf resBuf = (ByteBuf) msg;            byte[] resBytes = new byte[resBuf.readableBytes()];            resBuf.readBytes(resBytes);            System.out.println("Response data: " + new String(resBytes, StandardCharsets.UTF_8));            // 关闭通道            ctx.channel().close();        }        @Override        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {            System.out.println("channel read complete...");            ctx.flush();        }    }}

Netty 核心组件

下面来看下Demo程序中用到的一些组件和配置。

ServerBootstrap

ServerBootstrap 是 Netty 服务端的启动类入口,主要就是通过它来设置服务端启动相关的参数。底层通过门面模式对各种能力进行抽象封装,尽量不让用户跟过多的底层API打交道,降低开发难度。与之相对应, Bootstrap 则是客户端的启动辅助类。

最后通过 bind() 方法调用操作系统的 bindlisten 函数,开始监听客户端请求。

EventLoopGroup

EventLoop 是 Netty 处理事件的核心机制,EventLoopGroup 类似于一个线程组,内部包含多个 EventLoop。EventLoop 的职责是处理所有注册到本线程多路复用器 Selector 上的 Channel,Selector 的轮询操作由 EventLoop 绑定的线程执行。

ServerBootstrap 设置了两个 EventLoopGroup,一个为 bossGroup,一个为 workerGroup,bossGroup 负责轮询监听端口;workerGroup 负责处理 Socket 连接请求。

ServerSocketChannel

接着 channel(NioServerSocketChannel.class) 设置了监听端口的管道为 NioServerSocketChannel,负责和底层操作系统打交道,监听端口,创建 SocketChannel

在 BIO 中对应的为 ServerSocket,在 NIO 中对应 ServerSocketChannel。

ChannelHandler

通过 childHandler() 设置管道初始化器,可以通过这个初始化器添加 SocketChannel 自定义处理器。Demo 中,我们在自定义的 NettyClientHandler 和 NettyServerHandler 中实现客户端与服务端间的网络数据交互和请求处理。

ChannelHandler 是 Netty 提供给用户用于扩展和定制的关键接口,利用 ChannelHandler 可以完成大多数的功能定制,例如业务逻辑、消息编码、心跳、安全认证、流量控制等。

ChannelPipeline

SocketChannel 在创建的时候就会初始化一个管道 ChannelPipeline,可以通过 SocketChannel 获取到这个管道,然后添加一系列自定义处理器。

网络事件以事件流的形式在 ChannelPipeline 中流转,ChannelPipeline 就是负责处理网络事件的责任链,负责管理和执行 ChannelHandler。

ChannelFuture

Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,ChannelFuture 对象就代表这个异步操作本身,这是 Netty 实现异步 IO 的基础之一。

Netty 的异步编程模型都是建立在 Future 与回调概念之上的,Netty 扩展了 Java 标准的 Future,提供了针对自己场景的特有 Future 定义。

SO_BACKLOG

option(ChannelOption.SO_BACKLOG, 128) 设置了一个 BACKLOG 参数,这个参数就对应了TCP三次握手的 ACCEPT(全连接) 队列的大小,这个参数就是在调用 listen 函数时所需的 backlog 参数 。

listen 函数的Linux头文件以及函数定义如下:

#include <sys/socket.h>int listen(int sockfd, int backlog);

listen 函数会根据传入的 backlog 参数与系统的 /proc/sys/net/core/somaxconn 取二者的较小值。如果 ACCEPT 队列满了,就会拒绝客户端连接。

SO_KEEPALIVE

childOption(ChannelOption.SO_KEEPALIVE, true) 设置了 KEEPALIVE 参数。

TCP网络连接通过三次握手建立后,默认在2个小时内没发送过任何网络包时,如果设置了 KEEPALIVE=true,此时就会向对方发送探测包,防止对方已经断开连接,而自己还占着资源。

维护TCP长连接时系统配置的参数有如下几个:

# 空闲多长时间发送探测包net.ipv4.tcp_keepalive_time = 7200# 探测失败的重试次数,如果多次探测对方都没有回应,则关闭自己这端的连接net.ipv4.tcp_keepalive_probes = 9# 发送探测包的周期net.ipv4.tcp_keepalive_intvl = 75

ByteBuf

当我们进行数据传输时,往往会用到缓冲区,常用的就是 NIO 中的 java.nio.Buffer。Netty 则扩展了 NIO Buffer,提供了自己的 ByteBuf 实现。

在 channelRead 中我们将 msg 转换为 ByteBuf,这是因为 SocketChannel 中接收的字节数据会放入 ByteBuf 缓冲区中。通过 ByteBuf 的 readableBytes 方法可以获取缓冲区可读的字节数,根据可读的字节数创建 byte 数组,通过 ByteBuf 的 readBytes 方法将缓冲区中的字节数组复制到新建的 byte 数组中,最后通过 new String 构造函数获取请求报文。在响应数据时,也需要将消息先写入缓冲区 ByteBuf 中,再写入 SocketChannel。

Netty 通信原理

我们通过下图结合Demo程序再来看下 Netty 的核心组件是如何工作的。

EventLoopGroup 维护了一组 EventLoop,EventLoop 用来处理 Channel 生命周期中发生的事件。每个 EventLoop 会绑定一个 Thread 和 Selector,Netty 中的 Channel 会注册到一个 EventLoop 中,最后其实就是注册到 Selector 上,由 Selector 来监听 Channel 的事件,监听到事件后就交由 Thread 来处理。

Netty 服务端通过 ServerBootstrap 来启动,ServerBootstrap 首先需要设置两个 EventLoopGroup,一个用于监听客户端的TCP连接请求;一个用于处理建立好连接的 Channel 的IO请求。

ServerBootstrap 调用 bind() 方法绑定地址时,底层会调用操作系统的 socket()、bind()、listen() 函数,然后得到一个监听通道 ServerSocketChannel。这个 ServerSocketChannel 会注册到 EventLoop 中,由其绑定的 Selector 来监听 ServerSocketChannel 中的事件。

Netty 客户端通过 Bootstrap 来启动,Bootstrap 需设置一个 EventLoopGroup。Bootstrap 调用 connect() 方法连接服务端,底层会调用操作系统的 socket()、connect() 函数。通过TCP三次握手建立连接后,双方都会得到一个 SocketChannel,然后注册到 EventLoop 中,由其绑定的 Selector 来监听 SocketChannel 中的事件。

通过TCP三次握手建立的连接 SocketChannel 初始化时就会创建一个 ChannelPipeline,然后由 ChannelInitializer 来初始化,添加各种输入/输出处理器,来处理IO请求。ChannelPipeline 会负责事件在职责链中的有序传播,职责链可以选择监听和处理自己关心的事件。

原文:https://juejin.cn/post/7100845891346497549


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/15913.html