Netty的多线程设计及客服IM系统中的应用

1.背景

前文《在线客服IM系统设计》中介绍了在线客服选择了netty作为开发框架,高效的线程模式是netty的优势之一,本文将介绍Netty的多线程的设计以及在客服IM系统中的应用。

Netty的多线程模式是典型的Reactor线程模式,因此要理解Netty多线程模式先从Reactor谈起。

2.Reactor线程模式

2.1 什么是Reactor?

Reactor是一种广泛应用在服务器端开发的设计模式。我们知道,对于应用服务器,一个主要规律就是,CPU的处理速度是要远远快于IO速度的,如果CPU为了IO操作(例如从Socket读取一段数据)而阻塞显然是不划算的。好一点的方法是分为多进程或者线程去进行处理,但是这样会带来一些进程切换的开销,试想一个进程一个数据读了500ms,期间进程切换到它3次,但是CPU却什么都不能干,就这么切换走了,是不是也不划算?这时我们找到了事件驱动,或者叫回调的方式,来完成这件事情。这种方式就是,应用业务向一个中间人注册一个回调(event handler),当IO就绪后,就这个中间人产生一个事件,并通知此handler进行处理。那么问题来了:我们如何知道IO就绪这个事件,谁来充当这个中间人?Reactor模式的答案是:由一个不断等待和循环的单独进程(线程)来做这件事,它接受所有handler的注册,并负责先在操作系统查询IO是否就绪,在就绪后就调用指定handler进行处理,这个角色的名字就叫做Reactor

img reactor单线程模式

操作系统的I/O就绪事件其实就是连接建立、读、写三种。建立网络连接对于操作系统而言初始化了一个保存了通讯双方IP地址和端口的socket,图中的acceptor就是专门处理这一I/O就绪事件的handler。在Reactor线程模型下,Reactoracceptor和其他handler都在同一线程执行。

2.2 Reactor多线程模式

单线程的缺点显而易见,当其中某个 handler 阻塞时,其他所有的 client 的 handler 都得不到执行,更严重的是,handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。这种单线程模型不能充分利用多核资源。

img Reactor多线程模式

多线程I/O模式下负责处理连接事件的mainReactor只有依然只有一个线程。而subReactor是在一个线程池当中,可以有多个,它们负责去处理读写事件,发挥多核CPU的运算能力。

Reactor单线程模式就好比一个餐厅里,招待新顾客的和用户点餐的是一个服务员负责,这样当一个用户在点餐时,其他进入餐厅的顾客便得不到接待。而多线程模式下由一个服务员A单独负责招待新顾客,用户点餐由其他多个服务员负责。哪怕没有空闲服务员来帮用户点餐,服务员A也可以不停地招待顾客进入餐厅。这就保证了用户的连接事件不会受到其他阻塞操作的影响。

3.Netty的多线程模式

3.1 Netty的核心概念

Netty中定义了Channel,ChannelPipeline,ChannelHandler,ChannelEvent和ChannelFuture五个核心概念。

Channel:表示一个与socket关联的通道。新连接建立时会初始化并绑定一个Channel。其实就是把客户端的连接在应用程序的层面抽象为了一个Channel。Channel是随着连接的创建而被初始化,随着连接的断开而被销毁。连接中消息的读写都会被抽象为channel中的事件.

ChannelPipeline:存在于channel 中的管道。是channel中的通道,如果把channel看作一个处理消息的工厂,那么ChannelPipeline就是工厂里的流水线。读取消息时会有一个读事件会在这个通道中从头到尾走,而发送消息时会有一个写事件在这个通道中从尾到头走。在走的过程中会在这条流水线的各个环节被“加工”。一个事件不一定会走完整条通道,很有可能在某个节点完成处理。ChannelPipeline与Channel一对一绑定,共享生命周期。

ChannelHandler:ChannelPipeline上的处理器,ChannelelHandler是串行的,可以比作流水线上的工人,处理流水线上的各个事件。ChannelHandler必须主动调用下一个ChannelHandler才会让事件向下传递。他们收到一个事件并处理完成后,要主动fireEvent()方法将事件传递给下一个ChannelHandler,下一个ChannelHandler才会继续处理事件,否则就会认为处理结束了。我们可以在Chanel创建之后随时在ChannelPipeline上添加和删除ChannelHandler。

ChannelEvent:事件,channel中的一切活动包括消息读写,连接建立和断开,都会被抽象成事件。也可以主动在channel中主动产生或触发(fire)一个事件,该事件会沿着ChannelPipeline经过后续的ChannelHandler依次被处理。

ChannelFuture:异步结果,当一个事件被处理时,可以直接以ChannelFuture的形式直接返回,不用在当前操作中被阻塞。可以通过 ChannelFuture得到最终的执行结果,具体的做法是在ChannelFuture添加监听器listener,当操作最终被执行完 后,listener会被触发。

简单来说,就是连接建立时,操作系统的连接在应用层表现为一个ChannelChannel中会创建一个ChannelPipeline,可以任意在ChannelPipeline中添加和删除ChannelHandler,加入何种ChannelHandler决定了应用程序会如何处理连接和读写事件,连接中发生的事件都会在Channel中以ChannelEvent形式触发,被ChannelHandler处理。每次ChannelHandler处理ChannelEvent都是异步的,调用处理方法时,会马上返回一个ChannelFuture

channelPipeline中的ChannelHandler是串行的

3.2 Netty的线程池

netty的线程模型正是使用了Reactor多线程模式。Netty启动时会初始化BossWorker两个线程池。Boss中通常只有一个线程,负责处理连接事件,Worker负责处理读写事件,线程数默认为cpu核心数*2。相当于Reactor多线程模式中的mainReactor和subReactor。

netty服务启动代码:

1
2
3
4
5
6
7
8
9
10
private final NioEventLoopGroup boss = new NioEventLoopGroup(1);//boss线程池中只有一条线程
private final NioEventLoopGroup worker = new NioEventLoopGroup();//worker线程数使用默认的cpu核心数*2
private final ServerBootstrap bootstrap = new ServerBootstrap();//初始化服务器
public void initialize(){
bootstrap.group(boss, worker)
.channelFactory(NioServerSocketChannel::new)
.childHandler(initializer)
.bind(webSocketPortConfig.getPort());
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
}

NioEventLoopGroup其实就是一个线程池,持有一个EventExcutor数组,里面的EventExcutor实际类型是NioEventLoop,NioEventLoop继承了SingleThreadEventExecutor,可以简单理解为一条线程。

children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
   //初始化线程,newChild返回类型为NioEventLoop    
   children[i] = newChild(executor, args);
   //线程开始执行  
   for (int j = 0; j < i; j ++) {
       EventExecutor e = children[j];
       e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
    }
}

NioEventLoop的初始化方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

可以看到每个NioEventLoop中都持有一个SelectorChannel(即一条客户端连接)向Boss或Worker线程池中注册,实则是注册到NioEventLoop持有的Selector中,并得到一个关联此ChannelSelectionKeySelectionKey中维护着一个Intrest集合(即该连接需要NioEventLoop处理的事件)和就绪件队列,从而后续通过SelectionKey处理各个Channel的就绪事件。处理过程比较复杂,不在此深入。

3.3 为ChannelHandler分配线程

当不同Channel向NioEventLoopGroup进行注册时,如何将各个线程分配给Channel呢?

我们已知Channel中执行任务的部分是ChannelHandler,其实Netty正是以一个ChannelHandler为维度去分配线程的。

在Channel中的ChannelPipeline创建完成后,Netty向ChannelPipeline中添加ChannelHandler时调用了addLast(EventExecutorGroup group, String name, ChannelHandler handler)方法,如果没有group参数,会默认使用Worker线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
 //方法的核心代码
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//使用指定的EventExecutorGroup创建ChannelPipeline上下文
newCtx = newContext(group, filterName(name, handler), handler);
//添加创建的上下文
addLast0(newCtx);
//如果和当前上下文处于同一个线程中,同步执行,否则开启新的上下文中的线程
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

//ChannelPipeline中创建上下文的方法
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
//childExecutor选择了一条线程
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

//ChannelPipeline中childExecutor方法核心逻辑
private EventExecutor childExecutor(EventExecutorGroup group) {
//如果这个ChannelPipeline中已经记录了该线程池对应的执行线程,那么直接选择
EventExecutor childExecutor = childExecutors.get(group);
//没有对应记录时,选择并且记录执行线程,后续Channel中的所有事件都在此线程中触发
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}

从代码中可以得出一个Channel只能在指定的EventExecutorGroup中选择一条EventLoop作为执行线程且不可更改,而选择eventLoop的方式为group.next(),依次选择下一个,没有任何锁竞争,也没有限制选择未绑定的eventLoop,一个eventLoop可以对应多个ChannelHandler

ChannelHandler的线程分配

可见,Netty分配线程的方式是一种“无锁化”的实现,Channel从创建直到销毁,其中的每个ChannelHandler会使用指定的线程去执行任务,从而避免了线程分配时的锁竞争。但是由于绑定线程的原因,如果一个Channel中存在阻塞操作,那么其他绑定了同一线程的Channel会被影响,这也是在线客服系统中进行优化的关键问题。

3.4 小结

一个Channel在创建之后,会创建一条ChannelPipeline,并在其中加入处理各种事件的ChannelHandler。其中的ChannelHandler会与线程池中的一条线程进行绑定。ChannelHandler绑定的线程依然会分配给其他ChannelHandler,线程与ChannelHandler之间是一对多的关系,也就是说一条线程是会被多个客户端共享的。

对于一个系统而言,其用户数远大于系统cpu的核心数,我们不可能给每个客户端提供单独的线程。这样无疑增加了操作系统线程切换的开销。而在有限的线程数下,netty如此设计,让每条连接确定使用一条执行线程,避免了不同连接之间处理业务逻辑时的线程竞争,整个系统的效率会有所提升,并且减少了系统开销。理解netty中线程的执行过程和分配方式,能够帮助我们在开发中合理使用netty框架。

4.客服IM系统的应用

4.1 读写与业务逻辑分离

当服务端收到消息时,在IM应用程序中会经历三个过程,读取消息 => 业务处理(如身份验证,消息存库,获取机器人回复等)=> 转发消息。

IM应用的处理过程

其中读取消息和转发消息,由netty底层调用native方法,使用的是worker线程池。而我们向ChannelPipeline中添加各种处理业务逻辑的ChannelHandler,如果不指定线程池,也会默认使用worker线程池。那么就会产生下图情况,某一channel处理业务逻辑时发生阻塞,导致与该线程绑定的其他channel中的消息无法读取,从而影响到整个系统的读写性能。

绑定同线程的channel读消息时发生堵塞

因此在netty中,如果一个ChannelHandler中的逻辑存在阻塞的可能时,需要为其指定专门的线程池,避免影响channel的读写性能。

1
2
3
4
//添加处理websocket握手请求的ChannelHandler,身份验证时存在网络I/O,使用单独的线程池
pipeline.addLast(eventExecutorGroup1, HandlerNames.SECURITY_HAND_SHAKE_SERVER, securityHandShakeHandler);
//添加处理用户消息的ChannelHandler,存在大量数据库读写,网络请求等I/O操作,使用单独的线程池
pipeline.addLast(eventExecutorGroup2, HandlerNames.BUSINESS_SERVER, serverHandler);

连接、读、写与业务逻辑分离

4.2 坐席端的特殊处理

客服IM系统的业务逻辑处理存在大量的阻塞操作,包括消息写入数据库以及智能会话请求机器人服务等。因此我们需要业务逻辑线程池去专门处理这些阻塞操作。

netty默认分配线程的方式是一个ChannelHandler只能使用一个线程,其优势主要是在客户端数远大于cpu核心数时减少了线程竞争带来的开销。

对客服系统而言,坐席与用户两种客户端相比的区别很大,总结如下:

(1)与用户数量相比,坐席数量是比较少的的 (2)坐席是服务的提供者,坐席消息的阻塞会导致他接待的所有人工会话客户无法得到服务 (3)坐席的消息处理逻辑与客户差异较大,不存在机器人消息等需要产生RPC调用的消息服务,客户端阻塞时,坐席不一定会发生堵塞 (4)一个坐席会服务多个客户,存在多个会话

假设以下场景:系统某段时间内请求机器人智能服务发生阻塞,那么与机器人会话用户绑定同一线程的坐席消息也无法处理,该坐席所接待的人工会话用户也无法正常得到服务。

此场景下,因为某一用户的消息处理的阻塞,导致大量的会话无法正常进行,影响了系统的可用性。

因此在客服系统中,我们只对客户类型的客户端使用netty推荐的线程处理方式处理业务逻辑,而坐席的业务逻辑处理器,使用常规的自定义线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class MultiBusinessServerHandler extends BusinessServerHandler {
protected TraceableThreadPoolExecutor executor;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String text) throws Exception {
try {
//使用业务线程池处理消息
executor.execute(() -> {
try {
super.channelRead0(ctx, text);
} catch (Exception e) {
exceptionCaught(ctx, e);
}
});
} catch (YqgException e){
if(e.getErrorCode() == OnlineCustomerErrorCode.WEBSOCKET_BUSINESS_POOL_REJECT){
rejectMethod();
}
throw e;
}
}

如代码中,改变了netty推荐的ChannelHandler创建时绑定一条确定线程的模式,而是在BusinessServerHandler触发读事件时再去抢占线程,由于坐席数量是有限的,锁竞争的开销可以忽略。

这样设计解决了以下3个问题:

1
2
3
1.客户消息阻塞时,不会影响坐席消息的正常处理
2.存在空闲线程时,某坐席因为系统原因发生阻塞,不会影响其他坐席工作
3.存在空闲线程时,某坐席的一条消息阻塞,也不会影响他发送其他消息

需要注意的是,这样的业务逻辑处理过程,与netty的读消息流程是完全异步的,在BusinessServerHandler将任务加入线程池后,该handler便执行完成,就会调用fireEvent()触发下一个ChannelHandler的读事件,不再是netty推荐模式下,真正处理完成业务逻辑后再触发回调的模式。由于客服系统中BusinessServerHandler是最后一个读消息处理器,不会产生任何影响。但是如果后续还有对消息的其他处理,那么就要注意message的并发安全问题以及处理顺序的问题。

而对于用户连接,依然使用netty的默认分配方式,创建一个用于业务逻辑的EventLoopGroup,在连接建立时便为客户的业务逻辑处理器指定一个确定的线程,避免线程竞争,降低系统开销。

1
ctx.channel().pipeline().addLast(eventExecutorGroup, HandlerNames.BUSINESS_SERVER, serverHandler);

4.3 总结

客服IM系统开发过程中,在充分了解netty线程模式的基础上,对线程分配进行了定制化处理,不影响netty原生态高效读写的同时,保证了系统的可用性并降低了系统开销。Reactor线程模型也给了我们在开发应用服务时处理线程的一些启发。而netty除了优秀的多线程设计外,还有很多设计亮点值得我们继续挖掘,优秀的开源框架在为我们开发提供便利的同时,理解其中的设计与实现,捕捉作者的想法,也使得我们在开发工作中可以高屋建瓴,借鉴其中的思路,将其应用于最为契合的场景之下。