Netty 基础实践

警告
本文最后更新于 2024-01-06,文中内容可能已过时。

Netty 基础实践

参考官方博客:Netty.docs: User guide for 4.x

现在我们使用通用应用程序或库来相互通信。例如,我们经常使用 HTTP 客户端库从 web 服务器检索信息,并通过 web 服务调用远程过程调用。然而,通用协议或其实现有时不能很好地扩展。这就像我们不使用通用的 HTTP 服务器来交换巨大的文件、电子邮件消息和近乎实时的消息,比如金融信息和多人游戏数据。我们需要的是一个高度优化的协议实现,专门用于特殊目的。例如,您可能希望实现一个针对基于 ajax 的聊天应用程序、媒体流或大文件传输进行优化的 HTTP 服务器。您甚至可能想要设计和实现一个完全符合您需求的全新协议。另一种不可避免的情况是,您必须处理遗留专有协议以确保与旧系统的互操作性。在这种情况下,重要的是我们可以多快地实现该协议,同时不牺牲最终应用程序的稳定性和性能。

Netty 项目致力于提供一个异步事件驱动的网络应用程序框架和工具,用于快速开发可维护的高性能和高可伸缩性协议服务器和客户端。

换句话说,Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发网络应用程序,如协议服务器和客户端。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器的开发。快速和简单并不意味着最终的应用程序将遭受可维护性或性能问题。Netty 是根据从许多协议 (如 FTP、SMTP、HTTP 和各种基于二进制和文本的遗留协议) 的实现中学到的经验精心设计的。因此,Netty 成功地找到了一种方法,在不妥协的情况下实现易于开发、性能、稳定性和灵活性。

本篇博客通过简单的示例介绍 Netty 的核心结构,让您快速入门。当你在本章结束时,你将能够立即在 Netty 上编写客户端和服务器。如果你更喜欢自上而下的学习方法,你可能想从 Chapter 2, Architectural Overview开始,然后回到这里。

关于 Netty 的基本概念和设计,请看《Netty 的设计》,在进行实践之间,建议先看一遍《Netty 的设计》,把握核心的概念。

简单实践

用 Netty 实现TIME 协议的客户端和服务端。连接刚建立的时候,服务端就发送一个包含 32 位整数的消息,而不接收任何请求,并在消息发送后关闭连接。

话不多说,直接上代码。

Netty 服务端:

Maven 添加依赖

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.101.Final</version>
</dependency>

编写通道入栈处理器,即ChannelInboundHandler接口的实现类TimeServerHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 监听到客户端连接时,发送当前时间
     *
     * @param ctx
     */
    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        // 注意,如果需要在这里进行重操作,比如查询数据库之类的,一定要额外启动一个线程去做,而不要直接在当前线程去做,
        // 因为这样的重操作会阻塞当前线程,导致跟当前 Channel 共用一个线程(EventLoop)的其他 Channel 的事件处理被阻塞

        // 分配四个字节的空间
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        // 写入数据并刷新,为了方便,这里写入的类型是 ByteBuf,其他类型的数据则需要序列化
        // writeAndFlush 已经自动调用了 ByteBuf 的 release 方法
        final ChannelFuture f = ctx.writeAndFlush(time);
        // 监听客户端是否收到消息,收到后关闭连接
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                // 确定是同一个 Future
                assert f == future;
                // 管理上下文,即关闭连接
                ctx.close();
            }
        });
    }

    /**
     * 监听到异常时,关闭连接
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

注意:使用了ByteBuf之后要进行释放ByteBuf#release方法进行释放,方便 GC 回收。不过调用ChannelHandlerContext#writeAndFlush方法的时候已经对其进行了释放,因此我们不需要显示调用。

ChannelHandlerContext对象提供各种操作,使您能够触发各种 I/O 事件和操作。比如发送消息调用ChannelHandlerContext#write,比如关闭连接调用ChannelHandlerContext.close()

这里简单介绍一下如何服务端如何向客户端推送消息(客户端到服务端也是一样的),方法很简单,调用ChannelHandlerContext#write即可,不过要注意,这个方法是带缓存的,即只有缓存满了才会将所有缓存的数据一并发送给客户端,如果想要调用write之后立即发送给客户端,则需要刷新缓存,即调用ChannelHandlerContext#flush,方便起见可以直接调用ChannelHandlerContext#writeAndFlush方法。

还有一点要注意,Netty 中几乎所有操作都是异步的,ChannelHandlerContext#write也是,其会返回一个ChannelFuture,我们可以通过为ChannelFuture添加ChannelFutureListener回调来监听写入动作,比如实现一个ChannelFutureListener,在其中重写operationComplete方法来监听客户端收到消息这个事件。

ChannelFutureListener有一些自带的回调操作可以直接使用:

  • ChannelFutureListener.CLOSE:这个ChannelFuture完成后,关闭ChannelFuture关联的Channel

  • ChannelFutureListener.CLOSE_ON_FAILURE:这个ChannelFuture失败的时候,关闭ChannelFuture关联的Channel

  • ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE:这个ChannelFuture失败的时候,在ChannelFuture关联的ChannelChannelPipeline中触发exceptionCaught方法

总的来说,这个ChannelHandler的逻辑是,跟客户端建立连接之后,就返回 4 个字节的时间信息,然后关闭这个跟这个客户端的连接,注意只是关闭跟这一个客户端的连接不是关闭整个服务端。

然后创建服务端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class SimpleServer {
    private int port;

    public SimpleServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // 监听新的连接的 EventLoopGroup,也就是处理 Server Channel 的 EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 处理已经建立连接的消息发送的 EventLoopGroup,也就是处理 Child Channel 的 EventLoopGroup
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 后文的很多 child 开头的 API,设置的就是 Child Channel
        try {
            // 注意,服务端用 ServerBootstrap,客户端用 Bootstrap
            ServerBootstrap b = new ServerBootstrap();
            // 服务端需要两个 EventLoopGroup
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            // 指定 Child Channel 的 ChannelHandler
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口,并开始接收客户端的连接请求
            ChannelFuture f = b.bind(port).sync();

            // 等到 server socket 关闭,也就是 Server Channel 的关闭
            // 在当前例子中,不会出现 server socket 关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅的关闭 EventLoopGroup
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        // 启动服务端
        new SimpleServer(port).run();
    }
}

ChannelInitializer是一个特殊的ChannelHandler,用于帮助用户配置新创建的Channel。您很可能希望通过添加一些ChannelHandler来配置新创建的ChannelChannelPipeline,以实现您的网络应用程序。随着应用程序变得复杂,您可能会向ChannelPipeline中添加更多ChannelHandler,并最终将这个匿名类提取到来给一个名字,变成一个通用的顶级类。

您还可以设置特定于不同的Channel实现的参数。比如当前我们正在编写一个TCP/IP服务器,所以我们可以设置 socket 套接字的选项,如tcpNoDelaykeepAlive。请参考ChannelOption的和特定的ChannelConfig实现的 API 文档,以获得有关支持的ChannelOptions的描述。设置的方式就是通过 option()childOption()option()用于NioServerSocketChannel,它接受客户端的连接请求。childOption()用于ServerChannel接受并建立连接的Channel,在本例中为NioSocketChannel

关闭 Netty 的方式很简单,直接关闭所有的EventLoopGroup即可,即调用EventLoopGroup#shutdownGracefully方法,注意这个方法是异步的,并不会立即执行,返回对象为Future

启动服务端的方式很简单,直接运行 main 方法即可。

Netty 客户端:

同样的,先创建客户端的ChannelHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 注意,如果需要在这里进行重操作,比如查询数据库之类的,一定要额外启动一个线程去做,而不要直接在当前线程去做,
        // 因为这样的重操作会阻塞当前线程,导致跟当前 Channel 共用一个线程(EventLoop)的其他 Channel 的事件处理被阻塞

        // 获取传递过来的对象
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            // 关闭连接
            // 注意是关闭这一个连接这一个 Channel,并不是关闭整个服务端
            ctx.close();
        } finally {
            // 释放缓存
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        // 关闭连接
        ctx.close();
    }
}

注意,channelRead(ChannelHandlerContext ctx, Object msg)中的 msg 的数据类型就是 Object,想要转化为指定类型的手动转换。这里因为我们知道消息类型是ByteBuf,所以可以直接转换为ByteBuf

ByteBuf是一个引用计数(reference-counted)的对象(实现了ReferenceCounted接口的对象),必须通过release()方法显式释放。请记住,释放传递给ChannelHandler的任何引用计数对象是ChannelHandler的责任。通常,channelRead() 处理程序方法是这样实现的

1
2
3
4
5
6
7
8
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

这个ChannelHandler的逻辑是收到服务端的返回信息之后,就关闭跟服务端的连接,然后整个客户端就关闭了。

然后再创建客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static void main(String[] args) throws Exception {
        // 可以通过 java -jar 来传递 host 和端口参数,这里就省了,直接在代码中硬编码
        // String host = args[0];
        // int port = Integer.parseInt(args[1]);
        // 指定服务端的 IP 和端口
        String host = "localhost";
        int port = 8080;
        // EventLoopGroup 继承自 ScheduledExecutorService,本质上就是一个线程池
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 注意,服务端用 ServerBootstrap,客户端用 Bootstrap
            Bootstrap b = new Bootstrap();
            // 客户端只需要一个 EventLoopGroup 即可
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 指定 Channel 的 ChannelHandler
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // 启动客户端,连接服务端
            ChannelFuture f = b.connect(host, port).sync();

            // 等待连接关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭 EventLoopGroup
            workerGroup.shutdownGracefully();
        }
    }

直接运行 main 方法,就可以连接到服务端,此时如果服务端还没启动,则会报错:Connection refused: no further information: localhost/127.0.0.1:8080

先启动服务端,然后再启动客户端,然后客户端就会输出

1
Tue Dec 19 07:24:23 CST 2023

然后客户端就关闭了,为什么,其实是因为我们在服务端的ChannelHandler和客户端的ChannelHandler中都显式地调用了ChannelHandlerContext.close()来显示地关掉跟对方的连接,其实只要服务端和客户端只要有一方调用了这个方法,服务端跟客户端之间的连接就会结束。因此如果我们想保持这个连接,就不要调用这个方法。注意,跟,ChannelHandlerContext.write方法一样,ChannelHandlerContext.close也是异步的,返回ChannelFuture,因此调用了这个方法,可能连接也不会立即关闭。Netty 中的方法真的都是异步的

此外我们可以注意到,客户端关闭了,服务端一直没有关闭,因为服务端只是关闭了跟这一个客户端的连接,依然可以保持跟其他客户端的连接同时继续监听端口和新的客户端建立连接,

其实只是线程池没有结束而已

客户端怎么给服务端发消息呢?调用ChannelHandlerContext#writeAndFlush方法即可,可直接参考上文中服务端的ChannelHandler的代码,这里就不写粘贴码了,很简单。服务端和客户端的ChannelInboundHandlerAdapter其实是同一套接口。

基于流的传输的问题

基于流的数据传输比如 TCP/IP 会有一个问题,就是收到的数据会保存在 socket(在 Netty 里就是 Channel)的接受数据缓冲区(Socket Buffer)中,问题是缓冲区里都是按照字节来缓存数据的,而不是按照消息报文来缓存的,简而言之就是丢失了字节的分段信息,操作系统来读取这个缓冲区的时候,无法确定两个字节是否来自于同一个报文,这样你就无法按照消息发送端的分段来读取消息,例如第一次收到了两个字节,比如字节 1 和字节 2,第二次收到了字节 3,现在我们来读这个缓冲区,我们根本无法确定字节 1 和字节 2 是不是来自于一个请求,我们将无法完整地读取到一个完整的报文,举个最简单的例子,比如当我们通过 UTF-16 编码字符串的时候,有的字符占用两个字节,如果我们只读取了一个字节,那么将无法正确解码,最终乱码。

有两种解决方案

以固定长度读取缓冲区

我们在发送数据的时候,保证每个报文的长度都是固定的,同时在读取接受数据的缓存的时候,再创建一个跟报文长度相同的固定长度的内部缓存,这个内部缓存满了就解码/消费一次。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private ByteBuf buf;

    // 添加此 ChannelHandler 的时候执行,此时可以进行一些初始化操作
    // 这里初始化了一个定长的 ByteBuf 
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4);
    }

    // 移除此 ChannelHandler 的时候执行,此时可以进行一些资源回收,
    // 这里的操作时回收 ByteBuf 相关的资源 
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release();
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m);
        m.release();

        // 内部缓存满了,则开始读取
        if (buf.readableBytes() >= 4) {
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

这种方式的缺点很明显:

  • 不够灵活,如果我要穿一个超大的数据包,还要分片,

  • 容易造成流量的浪费,如果我确定的长度是 4 个字节,但是我实际只需要穿 1 个字节,那就有 3 个字节的空间是浪费掉的

用一个专门的 ChannelHandler 来处理

将整理缓冲区字节为正常消息的工作用一个专门的ChannelHandler来处理,即解码器。同时,Netty 提供了很多可继承的类来帮助你编写解码器(和编码器)。比如ByteToMessageDecoder,通过ByteToMessageDecoder,上面那个方案的代码可以简化为

1
2
3
4
5
6
7
8
9
public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 4) {
            return;
        }
        out.add(in.readBytes(4));
    }
}

ByteToMessageDecoderChannelInboundHandler 的实现类,专门用来处理字节转消息的问题,每当接收到新数据时,ByteToMessageDecoder会调用decode()方法,并传入ByteToMessageDecoder内部维护的累积缓冲区,当累积缓冲区中没有足够的数据时,decode()可以决定不向 out 添加任何数据。ByteToMessageDecoder将在接收到更多数据时再次调用decode()。如果decode()向 out 添加一个对象,则表示解码器成功解码了一条消息。ByteToMessageDecoder将丢弃累积缓冲区的已读部分。请记住,您不需要一次解码多条消息,一次一条地解析即可。ByteToMessageDecoder将继续调用 decode() 方法,直到它没有向 out 添加任何内容。

然后将这个 ChannelHandler 添加到 TimeClientHandler的前面,注意,添加顺序很重要,因为 Netty 会根据ChannelHandler的添加顺序来执行ChannelHandler,解析Channel事件,比如channelRead,因此编解码ChannelHandler需要在接收消息的ChannelHandler的前面,否则当我们在TimeClientHandler监听channelRead的时候,将不能成功地进行类型转换。

ChannelHandler生效的顺序是由其注册到ChannelPipeline的时候顺序决定的。因此解码器必须在内容解析ChannelHandler之前执行。

1
2
3
4
5
6
b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

ReplayingDecoder继承了ByteToMessageDecoder,并进一步简化

1
2
3
4
5
6
public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

除了ByteToMessageDecoderReplayingDecoder之外,Netty 还提供了其他开箱即用的解码器,使我们能够非常轻松地实现大多数协议,二进制协议的实现类为io.netty.example.factorial,基于文本的协议的实现类io.netty.example.telnet

通信协议

其实基于流的传输的问题,本质上是一个通信协议的问题,通信协议被设计出来,就是为了解决基于流的传输的问题的,通信协议本质上是一种格式,发送端按照规定的格式传输二进制流,接收端按照规定的格式解析二进制流。

关于这个观点的详细描述,请看《HTTP 协议基础》的HTTP 的作用是什么小节

Netty 也提供了对常见通信协议的支持,比如HTTP/1.1HTTP/2.0,并为其提供了开箱即用的解码器,这些编解码器,本质上也是ChannelHandler

  • HTTP/1.1的支持:HttpServerCodec

  • HTTP/2.0的支持:Http2FrameCodec(快要弃用了)

对其他协议的支持,以及相关的例子,请看更多例子小节。

传输自定义对象

上文的例子传输的是ByteBuf,那如何传输自定义的对象呢?很简单,添加此对象的编码器和解码器即可

参考官网提供的例子:objectecho

在客户端和服务端都添加此自定义对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CustomizeTransportObject implements Serializable {

    private String name;

    private int age;

    private LocalDateTime birthDate = LocalDateTime.now();


    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public LocalDateTime getBirthDate() {
        return birthDate;
    }

    public void setBirthDate(LocalDateTime birthDate) {
        this.birthDate = birthDate;
    }
}

然后修改TimeServerHandler发送消息的代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 监听到客户端连接时,发送当前时间
     *
     * @param ctx
     */
    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        // 注意,如果需要在这里进行重操作,比如查询数据库之类的,一定要额外启动一个线程去做,而不要直接在当前线程去做,
        // 因为这样的重操作会阻塞当前线程,导致跟当前 Channel 共用一个线程(EventLoop)的其他 Channel 的事件处理被阻塞

        CustomizeTransportObject transportObject = new CustomizeTransportObject();
        transportObject.setName("xiashuo.xyz");
        transportObject.setAge(20);
        // 这里已经自动调用了对象的 release 方法
        final ChannelFuture f = ctx.writeAndFlush(transportObject);


        // 监听客户端是否收到消息,收到后关闭连接
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                // 确定是同一个 Future
                assert f == future;
                // 管理上下文,即关闭连接
                ctx.close();
            }
        });
    }

    /**
     * 监听到异常时,关闭连接
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

在启动服务端的时候,添加对象的编码器和解码器,这里添加的是通用的编码器和解码器,而且要注意添加顺序,因为 Netty 会根据ChannelHandler的添加顺序来执行ChannelHandler,解析Channel事件,比如channelRead,因此编解码ChannelHandler需要在接收消息的ChannelHandler的前面,否则当我们在TimeClientHandler监听channelRead的时候,将不能成功地进行类型转换。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class SimpleServer {
    private int port;

    public SimpleServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // 监听新的连接的 EventLoopGroup,也就是处理 Server Channel 的 EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 处理已经建立连接的消息发送的 EventLoopGroup,也就是处理 Child Channel 的 EventLoopGroup
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 后文的很多 child 开头的 API,设置的就是 Child Channel
        try {
            // 注意,服务端用 ServerBootstrap,客户端用 Bootstrap
            ServerBootstrap b = new ServerBootstrap();
            // 服务端需要两个 EventLoopGroup
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            // 添加对象编码器和解码器
                            ch.pipeline().addLast(new ObjectEncoder());
                            ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));

                            // 指定 Child Channel 的 ChannelHandler
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口,并开始接收客户端的连接请求
            ChannelFuture f = b.bind(port).sync();

            // 等到 server socket 关闭,也就是 Server Channel 的关闭
            // 在当前例子中,不会出现 server socket 关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅的关闭 EventLoopGroup
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        // 启动服务端
        new SimpleServer(port).run();
    }
}

客户端的ChannelHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 注意,如果需要在这里进行重操作,比如查询数据库之类的,一定要额外启动一个线程去做,而不要直接在当前线程去做,
        // 因为这样的重操作会阻塞当前线程,导致跟当前 Channel 共用一个线程(EventLoop)的其他 Channel 的事件处理被阻塞

        if (!(msg instanceof CustomizeTransportObject)) {
            throw new IllegalArgumentException("msg is not CustomizeTransportObject");
        }

        CustomizeTransportObject msgObj = (CustomizeTransportObject) msg;
        System.out.println(msgObj.getName() + ":" + msgObj.getAge() + ":" + msgObj.getBirthDate());
        ctx.close();

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        // 关闭连接
        ctx.close();
    }
}

客户端线程,主要的修改就是添加对象的编码器和解码器,这里添加的是通用的编码器和解码器,而且要注意添加顺序,因为 Netty 会根据ChannelHandler的添加顺序来执行ChannelHandler,解析Channel事件,比如channelRead,因此编解码ChannelHandler需要在接收消息的ChannelHandler的前面,否则当我们在TimeClientHandler监听channelRead的时候,将不能成功地进行类型转换。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class SimpleClient {

    public static void main(String[] args) throws Exception {
        // 可以通过 java -jar 来传递 host 和端口参数,这里就省了,直接在代码中硬编码
        // String host = args[0];
        // int port = Integer.parseInt(args[1]);
        // 指定服务端的 IP 和端口
        String host = "localhost";
        int port = 8080;
        // EventLoopGroup 继承自 ScheduledExecutorService,本质上就是一个线程池
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 注意,服务端用 ServerBootstrap,客户端用 Bootstrap
            Bootstrap b = new Bootstrap();
            // 客户端只需要一个 EventLoopGroup 即可
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 添加对象编码器和解码器
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
                            ClassResolvers.cacheDisabled(null)));

                    // 指定 Channel 的 ChannelHandler
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // 启动客户端,连接服务端
            ChannelFuture f = b.connect(host, port).sync();

            // 等待连接关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭 EventLoopGroup
            workerGroup.shutdownGracefully();
        }
    }
}

先启动服务端,再启动客户端,客户端输出

1
xiashuo.xyz:20:2023-12-19T12:24:07.241

注意,ObjectEncoderObjectDecoder都已经被弃用了,你也可以通过继承MessageToByteEncoder自定义一个编码类和通过继承ByteToMessageDecoder自定义一个解码器。

更多例子

io.netty.example包中。我们可以看到 Netty 提供的对多场景和网络种协议的支持,以及相关的示例,例如

有机会再好好研究研究 Netty 的这些例子,再好好研究面向 Socket 编程。

其实对协议的支持,本质上就是提供一个ChannelHandler来处理特定格式的字节流。

0%