Netty in Action:ChannelHandler和ChannelPipeline

本章包含

ChannelHandler和ChannelPipeline APIs

检测资源泄露

异常处理

在前一章,你学习了Netty的数据容器ByteBuf。在这一章我们会在你已经学过的知识的基础上探讨Netty的数据流和处理模块。你会开始看到这个框架的一些重要元素被组合到一起了。

你已经了解到,ChannelHandler在一个ChannelPipeline中被链在一起,将所有的处理逻辑组织起来。我们会在这一章学习有关这两个类的各种用例,以及另一个重要的相关类,ChannelHandlerContext。

理解所有这些组件之间的相互作用是用Netty创建一个模块化的,可复用应用的关键。

6.1 ChannelHandler家族

为了给ChannelHandler的深入学习做好准备,我们先花一些时间来巩固下Netty的组件模型。

6.1.1 Channel的生命周期

接口Channel定义了一个简单强大,并且和ChannelInboundHandler API密切相关的状态模型。四种Channel状态如表6.1所示。

表6.1 Channel生命周期状态

状态 描述
ChannelUnregistered Channel已创建,还未注册到一个EventLoop上
ChannelRegistered Channel已经注册到一个EventLoop上
ChannelActive Channel是活跃状态(连接到某个远端),可以收发数据
ChannelInactive Channel未连接到远端

一个Channel正常的生命周期如图6.1所示。随着状态发生变化,相应的event产生。这些event被转发到ChannelPipeline中的ChannelHandler来采取相应的操作。

图6.1 Channel状态模型

6.1.2 ChannelHandler的生命周期

接口ChannelHandler定义的生命周期相关的操作列在表6.2。这些操作在ChannelHandler被添加到一个ChannelPipeline,或者从一个ChannelPipeline中移除时被调用。这里的每个方法都包含一个ChannelHandlerContext做为输入参数。

表6.2 ChannelHandler生命周期方法

类型 描述
handlerAdded 当ChannelHandler被添加到一个ChannelPipeline时被调用
handlerRemoved 当ChannelHandler从一个ChannelPipeline中移除时被调用
exceptionCaught 处理过程中ChannelPipeline中发生错误时被调用

Netty定义了下面两个重要的ChannelHandler子接口

- ChannelInboundHandler——处理输入数据和所有类型的状态变化

- ChannelOutboundHandler——处理输出数据,可以拦截所有操作

在下面的小节里,我们会详细讨论这两个接口。

6.1.3 ChannelInboundHandler接口

表6.3列出了接口ChannelInboundHandler生命周期相关的方法。当收到数据,或者相关Channel的状态改变时,这些方法被调用。像我们之前提到的,这些方法和Channel的生命周期密切相关。

表6.3 ChannelInboundHandler方法

类型 描述
channelRegistered 当一个Channel注册到EventLoop上,可以处理I/O时被调用
channelUnregistered 当一个Channel从它的EventLoop上解除注册,不再处理I/O时被调用
channelActive 当Channel变成活跃状态时被调用;Channel是连接/绑定、就绪的
channelInactive 当Channel离开活跃状态,不再连接到某个远端时被调用
channelReadComplete 当Channel上的某个读操作完成时被调用
channelRead 当从Channel中读数据时被调用
channelWritabilityChanged 当Channel的可写状态改变时被调用。通过这个方法,用户可以确保写操作不会进行地太快(避免OutOfMemoryError)或者当Channel又变成可写时继续写操作。Channel类的isWritable()方法可以用来检查Channel的可写状态。可写性的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()来设定。
userEventTriggered 因某个POJO穿过ChannelPipeline引发ChannelnboundHandler.fireUserEventTriggered()时被调用

当一个ChannelInboundHandler实现类重写channelRead()方法时,它要负责释放ByteBuf相关的内存。Netty为此提供了一个工具方法,ReferenceCountUtil.release(),如下所示。

代码清单6.1 释放消息资源

对未释放的资源,Netty会打印一个警告级别(WARN-level)的日志消息,让你很方便地找到代码中出问题的实例。但是用这种方式管理资源有些麻烦。一个更简单的替代方法就是用SimpleChannelInboundHandler。下面的代码是6.1的另一种形式,说明了SimpleChannelInboundHandler的用法。

代码清单6.2 使用SimpleChannelInboundHandler

x

因为SimpleChannelInboundHandler自动释放资源,任何对消息的引用都会变成无效,所以你不能保存这些引用待后来使用.

6.1.6小节有一个关于引用处理更详细的讨论。

6.1.4 ChannelOutboundHandler接口

输出的操作和数据由ChannelOutBoundHandler处理。它的方法可以被Channel,ChannelPipeline和ChannelHandlerContext调用。

ChannelOutboundHandler有一个强大的功能,可以按需推迟一个操作,这使得处理请求可以用到更为复杂的策略。比如,如果写数据到远端被暂停,你可以推迟flush操作,稍后再试。

表6.4列出了所有ChannelOutboundHandler自己定义的方法(省掉了那些继承自ChannelHandler的方法)。

表6.4 ChannelOutboundHandler方法

类型 描述
bind(ChannelHandlerContext,SocketAddress,ChannelPromise) 请求绑定Channel到一个本地地址
connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) 请求连接Channel到远端
disconnect(ChannelHandlerContext, ChannelPromise) 请求从远端断开Channel
close(ChannelHandlerContext,ChannelPromise) 请求关闭Channel
deregister(ChannelHandlerContext, ChannelPromise) 请求Channel从它的EventLoop上解除注册
read(ChannelHandlerContext) 请求从Channel中读更多的数据
flush(ChannelHandlerContext) 请求通过Channel刷队列数据到远端
write(ChannelHandlerContext,Object, ChannelPromise) 请求通过Channel写数据到远端

CHANNELPROMISE VS. CHANNELFUTURE

ChannelOutboundHandler的大部分方法都用了一个ChannelPromise输入参数,用于当操作完成时收到通知。ChannelPromise是ChannelFuture的子接口,定义了可写的方法,比如setSuccess(),或者setFailure(),而ChannelFuture则是不可变对象。

接下来,让我们来看下那些让编写定制化ChannelHandler变得更容易的适配器类

6.1.5 ChannelHandler适配器类

刚开始尝试些写你自己的ChannelHandler时,你可以用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter这两个类。这两个提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现。它们继承了共同的父接口ChannelHandler的方法,扩展了抽象类ChannelHandlerAdapter。类层级关系如图6.2所示。

图6.2 ChannelHandlerAdapter类层级关系

ChannelHandlerAdapter还提供了一个工具方法isSharable()。如果类实现带@Sharable注解,那么这个方法就会返回true,意味着这个对象可以被添加到多个ChannelPipeline中(如2.3.1小节所述)。

ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中的方法调用相关ChannelHandlerContext中的等效方法,因此将事件转发到管道中的下一个ChannelHandler。

如果想在你自己的handler中用到这些适配器类,只需要扩展它们,重写那些你想要定制的方法。

6.1.6 资源管理

无论何时你对数据操作ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write(),你需要确保没有资源泄露。也许你还记得上一章我们提到过,Netty采用引用计数来处理ByteBuf池。所以,在你用完一个ByteBuf后,调整引用计数的值是很重要的。

为了帮助你诊断潜在的问题, Netty提供了ResourceLeakDetector类,它通过采样应用程序1%的buffer分配来检查是否有内存泄露。这个过程的开销是很小的。

如果泄露被检测到,会产生类似下面这样的日志消息:

LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable
advanced leak reporting to find out where the leak occurred. To enable
advanced leak reporting, specify the JVM option
'-Dio.netty.leakDetectionLevel=ADVANCED' or call
ResourceLeakDetector.setLevel().

Netty目前定义了四种内存泄露检测的级别,如表6.5所示。

This chapter requires login to view full content. You are viewing a preview.

Login to View Full Content

Course Curriculum

3

框架与 I/O:Spring、Netty 与 Web 容器

理解 Spring Boot 自动装配、AOP 与事务原理,掌握 Netty Reactor 模型及 Tomcat 连接处理机制,构建高内聚、易扩展的应用服务层。
4

高性能中间件:消息、缓存与存储

熟练运用 MySQL 索引/事务、Redis 缓存策略、Kafka/RocketMQ 消息可靠性,以及 ZooKeeper 分布式协调,搭建稳定、解耦的分布式数据底座。
6

云原生:容器化、可观测性与工程效能

通过 Docker/K8s 实现弹性部署,集成 Metrics/Logs/Traces 构建可观测体系,推动 DevOps 与自动化,让架构在云上持续交付与进化。