Netty的多线程设计及客服IM系统中的应用
1.背景
前文《在线客服IM系统设计》中介绍了在线客服选择了netty作为开发框架,高效的线程模式是netty的优势之一,本文将介绍Netty的多线程的设计以及在客服IM系统中的应用。
Netty的多线程模式是典型的Reactor线程模式,因此要理解Netty多线程模式先从Reactor谈起。
2.Reactor线程模式
2.1 什么是Reactor?
Reactor
是一种广泛应用在服务器端开发的设计模式。我们知道,对于应用服务器,一个主要规律就是,CPU的处理速度是要远远快于IO速度的,如果CPU为了IO操作(例如从Socket读取一段数据)而阻塞显然是不划算的。好一点的方法是分为多进程或者线程去进行处理,但是这样会带来一些进程切换的开销,试想一个进程一个数据读了500ms,期间进程切换到它3次,但是CPU却什么都不能干,就这么切换走了,是不是也不划算?这时我们找到了事件驱动,或者叫回调的方式,来完成这件事情。这种方式就是,应用业务向一个中间人注册一个回调(event handler
),当IO就绪后,就这个中间人产生一个事件,并通知此handler进行处理。那么问题来了:我们如何知道IO就绪这个事件,谁来充当这个中间人?Reactor
模式的答案是:由一个不断等待和循环的单独进程(线程)来做这件事,它接受所有handler
的注册,并负责先在操作系统查询IO是否就绪,在就绪后就调用指定handler
进行处理,这个角色的名字就叫做Reactor
。
reactor单线程模式
操作系统的I/O就绪事件其实就是连接建立、读、写三种。建立网络连接对于操作系统而言初始化了一个保存了通讯双方IP地址和端口的socket,图中的acceptor
就是专门处理这一I/O就绪事件的handler。在Reactor
线程模型下,Reactor
、acceptor
和其他handler
都在同一线程执行。
2.2 Reactor多线程模式
单线程的缺点显而易见,当其中某个 handler
阻塞时,其他所有的 client 的 handler
都得不到执行,更严重的是,handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor
也被阻塞了)。这种单线程模型不能充分利用多核资源。
Reactor多线程模式
多线程I/O模式下负责处理连接事件的mainReactor
只有依然只有一个线程。而subReactor
是在一个线程池当中,可以有多个,它们负责去处理读写事件,发挥多核CPU的运算能力。
Reactor单线程模式
就好比一个餐厅里,招待新顾客的和用户点餐的是一个服务员负责,这样当一个用户在点餐时,其他进入餐厅的顾客便得不到接待。而多线程模式
下由一个服务员A单独负责招待新顾客,用户点餐由其他多个服务员负责。哪怕没有空闲服务员来帮用户点餐,服务员A也可以不停地招待顾客进入餐厅。这就保证了用户的连接事件不会受到其他阻塞操作的影响。
3.Netty的多线程模式
3.1 Netty的核心概念
Netty中定义了Channel,ChannelPipeline,ChannelHandler,ChannelEvent和ChannelFuture五个核心概念。
Channel
:表示一个与socket关联的通道。新连接建立时会初始化并绑定一个Channel。其实就是把客户端的连接在应用程序的层面抽象为了一个Channel。Channel是随着连接的创建而被初始化,随着连接的断开而被销毁。连接中消息的读写都会被抽象为channel中的事件.
ChannelPipeline
:存在于channel 中的管道。是channel中的通道,如果把channel看作一个处理消息的工厂,那么ChannelPipeline就是工厂里的流水线。读取消息时会有一个读事件会在这个通道中从头到尾走,而发送消息时会有一个写事件在这个通道中从尾到头走。在走的过程中会在这条流水线的各个环节被“加工”。一个事件不一定会走完整条通道,很有可能在某个节点完成处理。ChannelPipeline与Channel一对一绑定,共享生命周期。
ChannelHandler
:ChannelPipeline上的处理器,ChannelelHandler是串行的,可以比作流水线上的工人,处理流水线上的各个事件。ChannelHandler必须主动调用下一个ChannelHandler才会让事件向下传递。他们收到一个事件并处理完成后,要主动fireEvent()方法将事件传递给下一个ChannelHandler,下一个ChannelHandler才会继续处理事件,否则就会认为处理结束了。我们可以在Chanel创建之后随时在ChannelPipeline上添加和删除ChannelHandler。
ChannelEvent
:事件,channel中的一切活动包括消息读写,连接建立和断开,都会被抽象成事件。也可以主动在channel中主动产生或触发(fire)一个事件,该事件会沿着ChannelPipeline经过后续的ChannelHandler依次被处理。
ChannelFuture
:异步结果,当一个事件被处理时,可以直接以ChannelFuture的形式直接返回,不用在当前操作中被阻塞。可以通过 ChannelFuture得到最终的执行结果,具体的做法是在ChannelFuture添加监听器listener,当操作最终被执行完 后,listener会被触发。
简单来说,就是连接建立时,操作系统的连接在应用层表现为一个Channel
,Channel
中会创建一个ChannelPipeline
,可以任意在ChannelPipeline
中添加和删除ChannelHandler
,加入何种ChannelHandler
决定了应用程序会如何处理连接和读写事件,连接中发生的事件都会在Channel
中以ChannelEvent
形式触发,被ChannelHandler
处理。每次ChannelHandler
处理ChannelEvent
都是异步的,调用处理方法时,会马上返回一个ChannelFuture
。
channelPipeline中的ChannelHandler是串行的
3.2 Netty的线程池
netty的线程模型正是使用了Reactor多线程模式
。Netty启动时会初始化Boss
和Worker
两个线程池。Boss
中通常只有一个线程,负责处理连接事件,Worker
负责处理读写事件,线程数默认为cpu核心数*2。相当于Reactor多线程模式中的mainReactor和subReactor。
netty服务启动代码:
1 | private final NioEventLoopGroup boss = new NioEventLoopGroup(1);//boss线程池中只有一条线程 |
NioEventLoopGroup其实就是一个线程池,持有一个EventExcutor数组,里面的EventExcutor实际类型是NioEventLoop,NioEventLoop继承了SingleThreadEventExecutor,可以简单理解为一条线程。
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
//初始化线程,newChild返回类型为NioEventLoop
children[i] = newChild(executor, args);
//线程开始执行
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
}
NioEventLoop
的初始化方法:
1 | NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, |
可以看到每个NioEventLoop
中都持有一个Selector
。Channel
(即一条客户端连接)向Boss或Worker线程池中注册,实则是注册到NioEventLoop持有的Selector
中,并得到一个关联此Channel
的SelectionKey
。SelectionKey
中维护着一个Intrest集合(即该连接需要NioEventLoop处理的事件)和就绪件队列,从而后续通过SelectionKey
处理各个Channel
的就绪事件。处理过程比较复杂,不在此深入。
3.3 为ChannelHandler分配线程
当不同Channel向NioEventLoopGroup
进行注册时,如何将各个线程分配给Channel呢?
我们已知Channel中执行任务的部分是ChannelHandler
,其实Netty正是以一个ChannelHandler
为维度去分配线程的。
在Channel中的ChannelPipeline
创建完成后,Netty向ChannelPipeline
中添加ChannelHandler
时调用了addLast(EventExecutorGroup group, String name, ChannelHandler handler)方法,如果没有group参数,会默认使用Worker线程池。
1 | //方法的核心代码 |
从代码中可以得出一个Channel只能在指定的EventExecutorGroup中选择一条EventLoop作为执行线程且不可更改
,而选择eventLoop
的方式为group.next(),依次选择下一个,没有任何锁竞争,也没有限制选择未绑定的eventLoop
,一个eventLoop
可以对应多个ChannelHandler
。
ChannelHandler的线程分配
可见,Netty分配线程的方式是一种“无锁化”的实现,Channel
从创建直到销毁,其中的每个ChannelHandler
会使用指定的线程去执行任务,从而避免了线程分配时的锁竞争。但是由于绑定线程的原因,如果一个Channel
中存在阻塞操作,那么其他绑定了同一线程的Channel
会被影响,这也是在线客服系统中进行优化的关键问题。
3.4 小结
一个Channel在创建之后,会创建一条ChannelPipeline
,并在其中加入处理各种事件的ChannelHandler
。其中的ChannelHandler
会与线程池中的一条线程进行绑定。ChannelHandler
绑定的线程依然会分配给其他ChannelHandler
,线程与ChannelHandler
之间是一对多的关系,也就是说一条线程是会被多个客户端共享的。
对于一个系统而言,其用户数远大于系统cpu的核心数,我们不可能给每个客户端提供单独的线程。这样无疑增加了操作系统线程切换的开销。而在有限的线程数下,netty如此设计,让每条连接确定使用一条执行线程,避免了不同连接之间处理业务逻辑时的线程竞争,整个系统的效率会有所提升,并且减少了系统开销。理解netty中线程的执行过程和分配方式,能够帮助我们在开发中合理使用netty框架。
4.客服IM系统的应用
4.1 读写与业务逻辑分离
当服务端收到消息时,在IM应用程序中会经历三个过程,读取消息 => 业务处理(如身份验证,消息存库,获取机器人回复等)=> 转发消息。
IM应用的处理过程
其中读取消息和转发消息,由netty底层调用native方法,使用的是worker线程池。而我们向ChannelPipeline
中添加各种处理业务逻辑的ChannelHandler
,如果不指定线程池,也会默认使用worker线程池。那么就会产生下图情况,某一channel处理业务逻辑时发生阻塞,导致与该线程绑定的其他channel中的消息无法读取,从而影响到整个系统的读写性能。
绑定同线程的channel读消息时发生堵塞
因此在netty中,如果一个ChannelHandler
中的逻辑存在阻塞的可能时,需要为其指定专门的线程池,避免影响channel的读写性能。
1 | //添加处理websocket握手请求的ChannelHandler,身份验证时存在网络I/O,使用单独的线程池 |
连接、读、写与业务逻辑分离
4.2 坐席端的特殊处理
客服IM系统的业务逻辑处理存在大量的阻塞操作,包括消息写入数据库以及智能会话请求机器人服务等。因此我们需要业务逻辑线程池去专门处理这些阻塞操作。
netty默认分配线程的方式是一个ChannelHandler
只能使用一个线程,其优势主要是在客户端数远大于cpu核心数时减少了线程竞争带来的开销。
对客服系统而言,坐席与用户两种客户端相比的区别很大,总结如下:
(1)与用户数量相比,坐席数量是比较少的的 (2)坐席是服务的提供者,坐席消息的阻塞会导致他接待的所有人工会话客户无法得到服务 (3)坐席的消息处理逻辑与客户差异较大,不存在机器人消息等需要产生RPC调用的消息服务,客户端阻塞时,坐席不一定会发生堵塞 (4)一个坐席会服务多个客户,存在多个会话
假设以下场景:系统某段时间内请求机器人智能服务发生阻塞,那么与机器人会话用户绑定同一线程的坐席消息也无法处理,该坐席所接待的人工会话用户也无法正常得到服务。
此场景下,因为某一用户的消息处理的阻塞,导致大量的会话无法正常进行,影响了系统的可用性。
因此在客服系统中,我们只对客户类型的客户端使用netty推荐的线程处理方式处理业务逻辑,而坐席的业务逻辑处理器,使用常规的自定义线程池。
1 | public abstract class MultiBusinessServerHandler extends BusinessServerHandler { |
如代码中,改变了netty推荐的ChannelHandler
创建时绑定一条确定线程的模式,而是在BusinessServerHandler
触发读事件时再去抢占线程,由于坐席数量是有限的,锁竞争的开销可以忽略。
这样设计解决了以下3个问题:
1 | 1.客户消息阻塞时,不会影响坐席消息的正常处理 |
需要注意的是,这样的业务逻辑处理过程,与netty的读消息流程是完全异步的,在BusinessServerHandler
将任务加入线程池后,该handler便执行完成,就会调用fireEvent()触发下一个ChannelHandler
的读事件,不再是netty推荐模式下,真正处理完成业务逻辑后再触发回调的模式。由于客服系统中BusinessServerHandler
是最后一个读消息处理器,不会产生任何影响。但是如果后续还有对消息的其他处理,那么就要注意message的并发安全问题以及处理顺序的问题。
而对于用户连接,依然使用netty的默认分配方式,创建一个用于业务逻辑的EventLoopGroup,在连接建立时便为客户的业务逻辑处理器指定一个确定的线程,避免线程竞争,降低系统开销。
1 | ctx.channel().pipeline().addLast(eventExecutorGroup, HandlerNames.BUSINESS_SERVER, serverHandler); |
4.3 总结
客服IM系统开发过程中,在充分了解netty线程模式的基础上,对线程分配进行了定制化处理,不影响netty原生态高效读写的同时,保证了系统的可用性并降低了系统开销。Reactor线程模型也给了我们在开发应用服务时处理线程的一些启发。而netty除了优秀的多线程设计外,还有很多设计亮点值得我们继续挖掘,优秀的开源框架在为我们开发提供便利的同时,理解其中的设计与实现,捕捉作者的想法,也使得我们在开发工作中可以高屋建瓴,借鉴其中的思路,将其应用于最为契合的场景之下。