批量发送
通过了解 Netty4的原理我们可以得知,当调用 channel 的 writeAndFlush
方法时,Netty 会判断当前发送请求的线程是否是当前 channel 所绑定的 EventLoop
线程,如果不是 EventLooop
则会构造一个写任务 WriteTask
并将其提交到 EventLoop
中稍后执行,其代码(io.netty.channel.AbstractChannelHandlerContext
)如下:
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
//判断当前线程是否是该channel绑定的EventLoop
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
//将写任务提交到EventLoop上稍后执行
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
从这部分源码我们可以了解到 Netty 写消息时总是会保证把任务提交到 EventLoop 线程上处理,而每调度一次 EventLoop 线程去执行写任务 WriteTask 却只能写一个消息。其线程模型如下图:
而改造后的做法是将所有的消息都先提交到一个 WriteQueue
消息写队列上,内部会获取一次 EventLoop
并提交一个任务,该任务的逻辑比较简单,那就是从消息队列上不断的取消息出来并调用 Netty 的 write。其核心源码如下:
@Override
protected void flush(MessageTuple item) {
prepare(item);
Object finalMessage = multiMessage;
if (multiMessage.size() == 1) {
finalMessage = multiMessage.get(0);
}
channel.writeAndFlush(finalMessage).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ChannelPromise cp;
while ((cp = promises.poll()) != null) {
if (future.isSuccess()){
cp.setSuccess();
} else {
cp.setFailure(future.cause());
}
}
}
});
this.multiMessage.removeMessages();
}
执行该 flush
的逻辑时,是处于 EventLoop
线程的,而从前面的 Netty 源码我们知道,当写动作处于 EventLoop
线程中时是会立即执行写动作的,并不会出现线程切换的行为!那么相较于之前每次都直接在用户线程中调用 writeAndFlush
而言,大幅度的减少了用户线程与 EventLoop
线程的切换次数,也使得一次 WriteTask
写出的消息数量有了大幅度提高,达到批量发包的效果,以此提高 dubbo 协议在 小报文
场景下的性能。改造后的模型如下图:
总结
参考源码,
public void enqueue(T message, Executor executor) {
queue.add(message);
scheduleFlush(executor);
}
protected void scheduleFlush(Executor executor) {
if (scheduled.compareAndSet(false, true)) {
executor.execute(() -> this.run(executor));
}
}
核心就是当 当前的线程 执行 send 操作的时候,中间的添加的其他信息,先等待,放到队列中,然后等到上一个执行完毕,一次性执行 n 条。
比较适用于异步长连接的场景,在 A 执行动作后, 等待过程中, B、C、D 都可以继续执行动作,然后等待一起发送。
通过这种方式,减少在 IO 过程中用户与内核的切换开销
参考
Dubbo 性能调优总结文档 · Issue10915 · apache/dubbo · GitHub
issue 10727: optimize performance for `dubbo` protocol by icodening · Pull Request10728 · apache/dubbo · GitHub