程序员:Netty系列入门案例

Netty

Posted by MySelf 猫叔 on February 7, 2020

本博客 猫叔的博客,转载请申明出处

阅读本文约 “15分钟”

适读人群:Java 中级

学习笔记,Netty系列的学习教程,可能很多部分内容是摘抄,不过自己重新做了整理,相关案例也更新了自己的理解。

目录情况

Image Text

Netty简介

官方解释:Netty是一个异步的事件驱动的网络应用程序框架,用于快速开发可维护的高性能的协议服务器和客户端。——摘自Netty:Home

词义拆解:

  • 异步:无需阻塞等待线程执行结果,允许后续操作,直到其他线程执行完成后,再回调通知此线程。
  • 事件驱动:事件驱动体系结构(Event-Driven Architecture)是使用事件来触发解耦后服务之间的通信,事件是状态的更改或更新,例如加入到购物车中的课程这一事件。一般具有三个核心组件:事件生产者、事件路由器、事件使用者,路由器负责将事件进行过滤并推送给使用者,生产者与使用者分离,使他们可以独立扩展、部署。
  • 协议:网络通信的参与方必须遵循相同的规则,这套规则称为协议(protocol),它最终体现为在网络上传输的数据包的格式。

特点

设计优雅

  • 统一接口:提供了统一的异步I/O编程接口Channel,可针对多种传输类型的统一接口(阻塞和非阻塞套接字)
  • 异步非阻塞:采用异步非阻塞的I/O类库,基于Reactor模式实现
  • 事件驱动模型:基于灵活且可扩展的事件模型,让我们可以专注关注业务逻辑层
  • UDP协议:除了支持TCP也支持UDP(用户数据报协议),因UDP不用在客户和服务器之间建立一个连接,且没有超时重发等机制,所以传输速度很快
  • 责任链模式:ChannelPipeline基于责任链模式开发,便于业务逻辑的拦截,定制和扩展

上手易用

  • 自主配置:允许用户通过启动参数配置的形式选择Reactor单线程模型、Reactor多线程模型或Reactor主从多线程模型
  • 资料丰富:你可以在其官网查看详细的Javadoc用户指南或者部分案例

高性能

  • 零拷贝:TCP接收和发送缓冲区使用直接内存代替堆内存,避免了内存复制

  • 高效的并发编程:通过读写锁、volatile、线程安全容器等提升并发性能
  • 无锁化的串行设计:为了尽可能地避免锁竞争带来的性能损耗,可以通过串行化设计,既消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁

健壮抗压

  • 持续维护:其修复了已经发现的JDK NIO BUG,降低了开发人员的编程难度

  • 链路的有效性检测:TCP层面、协议层、应用层的心跳检测
  • 规避NIO BUG(Netty的解决策略)
    • 1、根据该BUG的特征,首先侦测该BUG是否发生
    • 2、将问题Selector上注册的Channel转移到新建的Selector上
    • 3、老问题的Selector关闭,使用新建的Selector替换

安全稳定

  • 协议:完整的 SSL / TLS 和 StartTLS 的支持
  • 范围广:运行在受限的环境例如 Applet 或 OSGI
  • 稳定:Netty对JDK的线程池进行了封装和改造,但是,本质上仍然是利用了线程池和线程安全队列简化了多线程编程,同时通过对象计数器对Netty的ByteBuf等内置对象进行细粒度的内存申请和释放,对非法的对象引用进行检测和保护

行业应用

Netty可以适用的行业非常广,因为设计优雅、高性能,其可以在多个行业有所应用,比如在互联网行业一般会作为PRC框架使用,而游戏行业中也进场需要其作为通信组件即多协议栈特点,可以在游戏行业发挥其高性能通信,还有在通信行业里,因其异步高性能、高可靠性等,它在互联网上也有许多开源或者教学的IM案例,等等…

个人经验

如果你或你的团队在找寻一个高性能且成熟稳定的NIO框架,那么一定要选择Netty

我在第一次了解Netty是因为项目需要支持TCP并发长连接的解决方案,而在互联网上找寻了许久,因为服务端是Java写的,且在了解Netty的机制与服务能力后,便开始了和Netty的不解之缘。

希望你也能有所收获!

Netty环境搭建

即使你是初学编程的小白,你可以跟着这个小节一步一步构建自己的开发环境,以便于后续学习Netty相关代码实战环境。

本小节选择的开发工具是IDEA,其是一款目前Java开发工程师比较常用的开发工具,而javaMaven的版本的选择,本着学有所成的目的,希望大家可以和课程保持一致。

配置Java

如果你的电脑已经安装了JDK,那么请你先验证一下它的版本。

打开电脑cmd,输入:

java -version

如下是本次演示的java版本。

image-20200104111453974

如果你还没安装JDK,那么可以到Oracle官网下载。

image-20200112071259266

下载地址:https://www.oracle.com/technetwork/java/javase/downloads/index.html

注意:我们仅需下载JDK即可

下载后进行安装即可。

安装后还要进行环境变量的配置,在系统变量中新建JAVA_HOME、CLASSPATH两个。

image-20200112071521243

JAVA_HOME : C:\Program Files\Java\jdk1.8.0_191(Windows上安装的默认值)

CLASSPATH : .;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tolls.jar;

还有在Path中添加两个地址

image-20200112071550602

%JAVA_HOME%\bin

%JAVA_HOME%\jre\bin

在Linux上则将${JAVA_HOME}/bin添加到执行路径上。

以上配置好后,请再运行cmd,输入:Java -version,验证电脑的Java版本是否显示正常。

配置Maven

如果你的电脑已经安装了Maven,那么请你先验证一下它的版本。

打开电脑cmd,输入:

mvn -v

如下是本次演示的Maven版本。

image-20200104114438695

如果你还没安装Maven,那么可以到官网下载。

image-20200112071747829

下载地址:https://maven.apache.org/download.cgi

将文件解压到指定的目录下,如:F:\Maven\apache-maven-3.6.3

在系统变量中新建MAVEN_HOME

image-20200112071814559

MAVEN_HOME : F:\Maven\apache-maven-3.6.3

还有在Path中添加一个地址

image-20200112071836650

%MAVEN_HOME%\bin

在Linux上则将${MAVEN_HOME}/bin添加到执行路径上。

以上配置好后,请再运行cmd,输入:mvn -v,验证电脑的Maven版本是否显示正常。

之后可以再打开:F:\Maven\apache-maven-3.6.3\conf 下的settings,修改为阿里镜像

<mirrors>
    <mirror>
      <id>nexus-aliyun</id>
      <mirrorOf>central</mirrorOf>
      <name>Nexus aliyun</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>
</mirrors>

安装IDE

本次教程使用IDEA,大家请根据自己的电脑情况下载对应的版本,本次教程的IDEA版本是Windows的Community。

image-20200112071912375

下载地址:https://www.jetbrains.com/idea/download/#section=windows

如果你已经有自己熟练的IDE,那么也可以用于学习,并不会影响学习质量。

安装完成后,需要再配置下IDE的Maven,如下图,你可以修改Maven路径、settings文件及本地仓库。

image-20200104122636289

引入Netty

用IDEA构建一个简单的SpringBoot项目,大家这时可以在Project SDK的配置上选择我们一开始配置JDK,如下图

image-20200104122909694

项目新建后,请打开pom.xml文件,引入Netty资源,关于相关框架的Maven资源都可以到以下网站搜索

https://mvnrepository.com/

Netty地址:https://mvnrepository.com/artifact/io.netty/netty-all

netty-all的maven地址

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.44.Final</version>
</dependency>

引入到Pom.xml文件中,如下

<dependencies>
		<!--省略部分代码-->

		<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.44.Final</version>
		</dependency>
		
		<!--省略部分代码-->
	</dependencies>

以上,netty引入成功,环境搭建完成。

存在问题

1、javac显示不是内部或外部命令

操作系统win10、win7

配置如上文所示时,可以将PATH的路径改为绝对路径

C:\Program Files\Java\jdk1.8.0_191\bin

C:\Program Files\Java\jdk1.8.0_191\jre\bin

2、mvn显示不是内部或外部命令

jdk的环境变量配置有错,或者是M2_HOME路径有错

操作系统win10、win7

配置如上文所示时,可以将PATH的路径改为绝对路径

F:\Maven\apache-maven-3.6.3\bin

3、系统存在两个JDK版本

将默认启动的JDK版本中java.exe所在路径加入到操作系统PATH的首位

4、maven版本不兼容

输入:mvn -v,如果报“Exception in thread “main” java.lang.UnsupportedClassVersionError: org/apache/maven/cli/MavenCli : Unsupported major.minor version 51.0”的错误,可以更新新版的maven,解决问题。

Netty服务端创建

流程图理解

下图简单讲述一个Netty服务端构建的基本流程操作,并不涉及具体的业务逻辑。

image-20200112134508478

部分代码

如下展示部分代码与上图流程一致。

EventLoopGroup acceptorGroup = new NioEventLoopGroup();
try {
	//1、ServerBootstrap引导类
    ServerBootstrap b = new ServerBootstrap();
    //2、NioEventLoopGroup接受新连接及读/写处理
    b.group(acceptorGroup)
        //3、指定传输类型Channel
        .channel(NioServerSocketChannel.class)
        //4、添加业务Handler
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
            	socketChannel.pipeline().addLast(...业务Handler);
                }
         });
    //5、绑定服务器及端口
    ChannelFuture f = b.bind(port).sync();
    //6、监听服务器Channel关闭
    f.channel().closeFuture().sync();
}finally {
    //6、释放资源
    acceptorGroup.shutdownGracefully();
}

流程讲解

​ 对于一个基本的Netty服务端而言,他需要绑定到对应的服务器上同时在其上监听端口以保证可以接受传入的连接请求,还需要给他配置Channel,将入站消息及时通知给我们所定义的业务Handler中进行处理。

​ 我们一开始创建了一个ServerBootStrap实例,他是Netty启动NIO服务端的辅助启动类,可以为我们降低服务端的开发难度,同时构建了一个NioEventLoopGroup来接受和处理新的连接,当然你也可以创建两个Reactor线程组,一个用于服务端接受客户端的连接,一个用于进行SocketChannel的网络读写。我们还指定了Channel的类型为NioServerSocketChannel,其功能对应JDK NIO 类库中的ServerSocketChannel类,在大部分情况下,你可以配置NioServerSocketChannel的TCP参数,比如将它的backlog设置为1024等。

复杂的点来了,绑定I/O事件的ChildChannelHandler类,其实它有点类似Reactor中的Handler类,主要用于处理网络I/O,例如对消息的编解码等。可是为什么还有一个ChannelInitializer类呢?

这也是Netty美妙的地方,当一个新的连接被服务端接受时,一个新的子Channel会创建,而ChannelInitializer就会把我们的业务Handler实例添加到这个子Channel的ChannelPipeline中。

接下来是绑定服务器及端口,通过sync()我们可以阻塞当前的Thread,直到绑定操作完成为止,同时我们还要继续使用sync()去监听服务端的Channel,直到它被关闭,我们才能去关闭EventLoopGroup还有其他的所有资源。

以上也是Netty服务端的创建流程,相比其传统的NIO服务端,其大大简化了开发的复杂程度。

代码展示

以下是下文演示案例的服务端代码。

package com.demo.timer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @ClassName MkTimeServer
 * @Description 服务端
 * @Author Java猫说
 * @Date 2020/1/4 0004 14:02
 **/
public class MkTimeServer {

    public static void main(String[] args) throws Exception {
        int port = 8080;
        //启动服务端
        new MkTimeServer().run(port);
    }

    void run(int port) throws Exception{
        EventLoopGroup acceptorGroup = new NioEventLoopGroup();
        try {
            //引导类
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptorGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //MkTimeServerHandler属于业务Handler
                            socketChannel.pipeline().addLast(new MkTimeServerHandler());
                        }
                    });
            //阻塞直到异步绑定服务器完成
            ChannelFuture f = b.bind(port).sync();
            //阻塞直到Channel关闭
            f.channel().closeFuture().sync();
        }finally {
            acceptorGroup.shutdownGracefully();
        }
    }

}

Netty客户端创建

流程图理解

下图简单讲述一个Netty客户端构建的基本流程操作,并不涉及具体的业务逻辑。

image-20200112145419340

部分代码

如下展示部分代码与上图流程一致。

EventLoopGroup acceptorGroup = new NioEventLoopGroup();
try {
    //1、Bootstrap引导类
	Bootstrap b = new Bootstrap();
    //2、NioEventLoopGroup创建连接及处理出/入站数据
    b.group(group)
        //3、指定传输类型为NioSocketChannel类型
        .channel(NioSocketChannel.class)
        //4、添加业务Handler
    	.handler(new ChannelInitializer<SocketChannel>() {
        	@Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
            	socketChannel.pipeline().addLast(...业务Handler);
            }
         });
    //5、连接远程指定的host、port节点
    ChannelFuture f = b.connect(host,port).sync();
    //6、监听服务器的Channel关闭
    f.channel().closeFuture().sync();
}finally {
    //6、释放资源
    group.shutdownGracefully();
}

流程讲解

​ 对于一个基本的Netty客户端而言,其实我们可以发现它与服务端的流程很相似,如果说ServerBootStrap实例,他是Netty启动NIO服务端的辅助启动类,可以为我们降低服务端的开发难度,那么其实BootStrap也是Netty为我们提供的NIO客户端的辅助启动类。

AbstractBootstrap类是ServerBootstrap及Bootstrap的基类,感兴趣的朋友可以看看后续章节的源码解析。

与服务端不同的是,客户端的Channel需要设置为NioSocketChannel,同样你可以设置相关的选择,然后为其添加Handler。

当客户端完成并启动BootStrap辅助类,我们需要调用connect()方法发起异步连接,然后调用sync()同步等待连接成功。

当客户端连接关闭时,释放线程资源,并退出客户端主函数。

代码展示

以下是下文演示案例的服务端代码。

package com.demo.timer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @ClassName MkTimeClient
 * @Description 客户端
 * @Author Java猫说
 * @Date 2020/1/4 0004 14:03
 **/
public class MkTimeClient {

    public static void main(String[] args) throws Exception {
        int port = 8080;
        String host = "127.0.0.1";
        new MkTimeClient().connect(port, host);
    }

    void connect(int port, String host) throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //MkTimeClientHandler属于业务Handler
                            socketChannel.pipeline().addLast(new MkTimeClientHandler());
                        }
                    });
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

}

Netty 第一次运行

准备

在阅读本章之前,请先学习前两节,即Netty的服务端与客户端创建。

如果你能了解服务端与客户端的创建流程与步骤,那么接下来的学习可以更加高效。

流程逻辑

image-20200104182900894

这个例子以服务端和客户端的时间传输为例,给各位读者介绍Netty的服务端与客户端,让各位可以简单的搭建并运行。

编码过程涉及的部分知识点也会有所介绍,如果各位对于部分组件有兴趣也可以跳到对应的词条阅读相关的源码介绍。

如上图的项目架构逻辑图,本次学习的项目案例是使用Netty构建一个主动给连接发送时间戳的服务端,服务端的作用是启动后监听连接的客户端,每当有一个客户端连接时,新建一个Channel并在其ChannelPipeline后追加服务业务MkTimeServerHandler实例,业务Handler负责检测到新连接时主动向客户端发送当前系统时间戳。

客户端的作用是启动后根据指定的Host和Port去连接服务端,同时客户端Channel的ChannelPipeline也会有对应处理时间戳的客户业务MkTimeClientHandler实例,负责主动连接服务端,读取服务端发送的系统时间戳并打印到控制台并关闭客户端服务。

服务端业务讲解

我们的MkTimeServerHandler类第一步就是需要继承ChannelInboundHandlerAdapter类,首先了解下为什么需要继承它,ChannelInboundHandlerAdapter是ChannelHandler的适配器之一,其对应的还有ChannelOutboundHandlerAdapter,其中ChannelInboundHandler负责处理处理进站数据和所有状态更改事件,而ChannelOutboundHandler负责处理出站数据,允许拦截各种操作。

因为服务端需要第一时间判断新连接并主动向客户端发送系统时间戳,因此我们继承了ChannelInboundHandlerAdapter。

那么ChannelInboundHandlerAdapter负责的所有状态更改事件是挺多的,我们今天就先学习演示Demo中的两个。

适配器提供的多个与Channel生命周期相关的方法之一,channelActive指当Channel处于活跃时,即Channel连接且准备就绪时。这也满足我们业务场景的需求,在检测到连接的时候,获取当前的系统时间戳,并创建一个Netty定义的ByteBuf字节串用于保存时间戳,通过ChannelHandlerContext调用writeAndFlush方法发送给客户端。

这里的ChannelHandlerContext是ChannelPipeline用来直接管理ChannelHandler的“替身”。

一般我们在写这一类的业务Handler时,为了避免连接异常,一般都会实现exceptionCaught方法,其可以在捕获到异常时,打印并关闭这个Channel通道,这也是增强了代码的健壮性。

服务端业务代码

package com.demo.timer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;

import java.util.Date;

/**
 * @ClassName MkTimeServerHandler
 * @Description 发送系统时间
 * @Author Java猫说
 * @Date 2020/1/4 0004 14:03
 **/
public class MkTimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("One Channel Connect");
        //获取系统时间戳的字符串
        String serverTime = new Date(System.currentTimeMillis()).toString();
        //创建一个 ByteBuf 保存特定字节串
        ByteBuf resp = Unpooled.copiedBuffer(serverTime.getBytes());
        //将 ByteBuf 发送给客户端
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        //异常关闭
        ctx.close();
    }
}

客户端业务讲解

在客户端的MkTimeClientHandler类中,我们也同样继承了ChannelInboundHandlerAdapter,这里就不再赘述了。

channelRead也是Channel生命周期相关的方法之一,当Channel读取到消息时调用,即服务端发送消息,我们可以通过这个方法获取到服务端的时间消息。

我们通过ByteBuf接收,因为在服务端发送时也同一个了这个类型,并创建一个等长的byte数组,同时在将ByteBuf中的内容传输到byte数组,最后转化为String打印在控制台。

在这个业务中,我们在获取到服务端系统时间戳后就关闭了连接,我们通过调用ChannelHandlerContext的close方法,关闭连接,并在操作完成后通知ChannelFuture,因为无论关闭成功或失败,都无法再使用这个连接。

客户端业务代码

package com.demo.timer;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


/**
 * @ClassName MkTimeClientHandler
 * @Description 接受时间消息
 * @Author Java猫说
 * @Date 2020/1/4 0004 14:04
 **/
public class MkTimeClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m = (ByteBuf) msg;
        byte[] req = new byte[m.readableBytes()];
        m.readBytes(req);
        String serverTime = new String(req);
        System.out.println(serverTime + " From Server.");
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

运行

1、运行服务端,在IDEA中启动MkTimeServer的main函数。

image-20200112161000732

启动成功!

image-20200112161109665

2、运行客户端,在IDEA中启动MkTimeClient的main函数。

image-20200112161154768

启动成功!

image-20200112161255226

同时连接服务端,并接收到服务端发送的时间消息,打印到了控制台,且自动关闭客户端。

服务端在控制台也检测到客户端的连接并发送了系统时间,从控制台日志中可以看到,如下图。

image-20200112161438189

注意点

  • exceptionCaught(…)

https://github.com/netty/netty/issues/4721

normanmaurer:exceptionCaught(…) is only called for inbound exceptions. All outbound exceptions must by handled in a listener by design. If you always want to have exceptions handled in exceptionCaught(…) just add a ChannelOutboundHandler that will an listener for every outbound operation.

在这个issues中曾提过,exceptionCaught 只会捕获 inbound handler的exception, outbound exceptions 需要在writeAndFlush方法里加上listener来监听消息是否发送成功。

我是MySelf,还在坚持学习技术与产品经理相关的知识,希望本文能给你带来新的知识点。

公众号:Java猫说

学习交流群:728698035

现架构设计(码农)兼创业技术顾问,不羁平庸,热爱开源,杂谈程序人生与不定期干货。

Image Text