概述
在复杂的业务场景下,在处理一个请求时,可能会衍生出大量的向下游的请求。
如上图,处理一个上游请求时,会向下游发散请求10次,将10个下游接口返回的数据进行处理、归并后,完成自身业务逻辑处理并返回上游。
如果是串行的向下游发起10个请求,那整个请求处理的耗时将起码是10个下游响应RT之和,这样整个请求处理的性能会很差,无法满足C端链路上的SLA。
为此,开发同学一般会通过异步、并行请求的方式,同时向下游发起10个向下游的调用。为了实现并行、异步调用,一般有如下两种思路:
- 同步请求转成异步结果后进行结果编排:最直观的思路,但是伸缩性差,线程利用率低
- 异步请求后进行结果编排:利用Dubbo异步请求特性,伸缩性好,线程利用率高
业务逻辑示例
当前应用提供了一个AdminDubboApi,用于计算当日平均订单金额。该接口依赖下游OrderDubboApi接口。 通过调用OrderDubboApi分别获取当日总单量和当日总成交额后,计算平均订单金额后返回给上游调用者。
下游dubbo接口
public interface OrderDubboApi {
/**
* 获取今天成交总单量
* 耗时1s
*
* @return
*/
int countOrderToday();
/**
* 获取今日成交总金额
* 耗时1s
*
* @return
*/
int sumOrderAmountToday();
}
上游调用的dubbo接口
public interface AdminDubboApi {
/**
* 计算订单平均单价
*
* @return
*/
int averageOrderAmount();
}
同步的业务处理逻辑
@Component
public class AdminDubboApiImpl implements AdminDubboApi {
@Autowired
private OrderDubboApi orderDubboApi;
@Override
public int averageOrderAmount() {
//请求下游接口或许订单总数
int orderCount = orderDubboApi.countOrderToday();
//请求下游接口获取订单总额
int orderAmountSum = orderDubboApi.sumOrderAmountToday();
return orderAmountSum / orderCount;
}
}
通过同步、串行的调用下游接口两次获取数据,该接口的RT>=2s,Admin服务dubbo线程被阻塞2s。
同步请求转异步结果
上述示例中的串行请求下游的方式,在大部分场景下存在性能问题,业务逻辑的执行时间依赖于下游的返回时间或超时时间。
当下游接口响应慢时会导致当前请求长时间阻塞、占用dubbo服务端业务处理线程,进而导致出现threadpool is exhausted异常的概率大大增加,对业务有损。
为了优化请求处理耗时,直观的思路是,新创建一个线程池,将上述两次dubbo调用封装成异步任务,投递到线程池中。
@Component
public class AdminDubboApiV2Impl implements AdminDubboApi, InitializingBean {
@Autowired
private OrderDubboApi orderDubboApi;
private ExecutorService executorService;
@Override
public void afterPropertiesSet() throws Exception {
executorService = Executors.newFixedThreadPool(500);
}
@Override
public int averageOrderAmount() {
//异步请求下游接口或许订单总数
CompletableFuture<Integer> orderCountFuture = CompletableFuture.supplyAsync(orderDubboApi::countOrderToday, executorService);
//异步请求下游接口获取订单总额
CompletableFuture<Integer> orderAmountSumFuture = CompletableFuture.supplyAsync(orderDubboApi::sumOrderAmountToday, executorService);
//将上述两个异步结果归并成一个Future
CompletableFuture<Void> compositeFuture = CompletableFuture.allOf(orderCountFuture, orderAmountSumFuture);
try {
//在归并后的异步结果上进行超时阻塞
compositeFuture.get(1, TimeUnit.SECONDS);
//代码进行到此处时,说明上述两个异步任务在1s内完成,已经有结果了
Integer orderCount = orderCountFuture.get(); //此处不会阻塞
Integer orderAmountSum = orderAmountSumFuture.get(); //此处不会阻塞
return orderAmountSum / orderCount; //业务逻辑处理
} catch (InterruptedException | TimeoutException e) {
//任意子future中断或超时时,取消所有子任务
//避免已经不需要执行的子任务还在排队等待执行
orderCountFuture.cancel(false);
orderAmountSumFuture.cancel(false);
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
}
利用一个辅助线程池,将向下游的dubbo调用异步化、并行化。带来了如下好处:
- dubbo服务端线程被阻塞的时间变短了,从串行调用的2s,减少为并行调用的1s,避免的Dubbo请求处理线程长时间阻塞后没有空闲线程处理新请求
- 整个请求处理的耗时降低了,从同步请求的2s降低为1s
上述可知,整个接口的性能提升,是因为从Dubbo处理线程的角度来看阻塞减少了。
但是从本质上来说,阻塞并没有消失,只是转移了。因为下游处理两次请求不可被外力转移的客观的就需要2s,两次dubbo接口调用一定会在某个地方阻塞某个线程2s。没有什么岁月静好,一定是有人在默默负重前行。
这个阻塞的地方其实是那个独立的辅助线程池。两次请求会阻塞辅助线程池2s,但是传导到Dubbo请求处理线程的阻塞就只有1s了。
这种思路本质上还是隔靴搔痒的补丁方案,虽然我们避免了Dubbo线程的阻塞,尽可能减少了threadpool is exhausted异常,但是瓶颈又到了那个辅助线程池上了。
一方面随着调用量的增长,辅助线程池可能会不够用,存在伸缩性问题;另一方面因为同步Dubbo请求的阻塞没有消除,导致线程的利用率是很低的。辅助线程池的每个线程1s完成一次dubbo请求,其中的大部分时间是在阻塞。虽然线程阻塞不消耗cpu,但是因为线程阻塞导致线程利用率低,导致需要的线程数变多,进而导致线程占用的内存开销变大。
如果能从Dubbo框架底层就实现非阻塞的、事件驱动的异步调用就好了。这样的话就不需要我们在业务代码中利用辅助线程池来实现同步请求转异步请求了。
Dubbo异步请求
从上面的思路一路下来,我们发现,为了实现请求并行,结果归并,最核心就是要想办法拿到一次Dubbo请求的Future。不管是通过辅助线程池来同步转异步还是其他别的方式。
拿到多个请求的Future后,业务代码上就可以编排Future,实现灵活、高效的业务处理。
Dubbo是基于Netty的RPC框架,Netty是非阻塞、事件驱动的网络库,那其实构建在Netty上的Dubbo本身的请求-响应处理也是非阻塞、事件驱动的。
只不过是为了匹配开发者习惯的使用心智,Dubbo框架在底层进行了一次异步转同步。也就是调用dubbo接口方法时,会一直阻塞到下游结果返回。
也就是说,上述的利用辅助线程池实现的同步调用转异步调用,其实是多余的。
org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke
Dubbo底层做了异步转同步,然后业务代码里又在异步转同步的基础上做了同步转异步~
只要我们提示Dubbo说,我下面发起的请求是异步请求,你不要阻塞我,给我一个Future就好。那Dubbo就会省去内置的异步转同步操作,给你一个异步Future。
@Component
public class AdminDubboApiV3Impl implements AdminDubboApi {
@Autowired
private OrderDubboApi orderDubboApi;
@Override
public int averageOrderAmount() {
//org.apache.dubbo.rpc.RpcContext
RpcContext context = RpcContext.getContext();
//异步请求下游接口或许订单总数
CompletableFuture<Integer> orderCountFuture = context.asyncCall(orderDubboApi::countOrderToday);
//异步请求下游接口获取订单总额
CompletableFuture<Integer> orderAmountSumFuture = context.asyncCall(orderDubboApi::sumOrderAmountToday);
//将上述两个异步结果归并成一个Future
CompletableFuture<Void> compositeFuture = CompletableFuture.allOf(orderCountFuture, orderAmountSumFuture);
try {
//在归并后的异步结果上进行超时阻塞
compositeFuture.get(1, TimeUnit.SECONDS);
//代码进行到此处时,说明上述两个异步任务在1s内完成,已经有结果了
Integer orderCount = orderCountFuture.get(); //此处不会阻塞
Integer orderAmountSum = orderAmountSumFuture.get(); //此处不会阻塞
return orderAmountSum / orderCount; //业务逻辑处理
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
}
如上述,不需要任何额外的辅助线程池,两次dubbo调用直接从dubbo框架层面返回了异步结果CompletableFuture,整个请求过程都是非阻塞的。
后续就是相同的、大家很熟悉的异步请求结果编排了~
服务端异步处理
@Component
public class AdminDubboApiV3Impl implements AdminDubboApi {
@Autowired
private OrderDubboApi orderDubboApi;
@Override
public int averageOrderAmount() {
//org.apache.dubbo.rpc.RpcContext
RpcContext context = RpcContext.getContext();
//异步请求下游接口或许订单总数
CompletableFuture<Integer> orderCountFuture = context.asyncCall(orderDubboApi::countOrderToday);
//异步请求下游接口获取订单总额
CompletableFuture<Integer> orderAmountSumFuture = context.asyncCall(orderDubboApi::sumOrderAmountToday);
//将上述两个异步结果归并成一个Future
CompletableFuture<Void> compositeFuture = CompletableFuture.allOf(orderCountFuture, orderAmountSumFuture);
try {
//在归并后的异步结果上进行超时阻塞
compositeFuture.get(1, TimeUnit.SECONDS);
//代码进行到此处时,说明上述两个异步任务在1s内完成,已经有结果了
Integer orderCount = orderCountFuture.get(); //此处不会阻塞
Integer orderAmountSum = orderAmountSumFuture.get(); //此处不会阻塞
return orderAmountSum / orderCount; //业务逻辑处理
} catch (InterruptedException | TimeoutException e) {
//任意子future中断或超时时,取消所有子任务
//避免已经不需要执行的子任务还在排队等待执行
orderCountFuture.cancel(false);
orderAmountSumFuture.cancel(false);
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
}
上述请求异步化后,在最终等待结果时,还是会阻塞Dubbo服务端处理线程,最多等待1s后,Dubbo线程才会解除阻塞,处理后的结果才会通过网络异步返回到调用端。
但是在Dubbo框架内部,在业务逻辑处理完成后,都是统一包装成异步结果,以发布订阅、事件驱动的模式推动后续将响应发往客户端的。
org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke
也就是说,我们不需要在代码里阻塞等待明确的结果才return,直接把一个异步结果交给dubbo去监听、处理即可。
@Component
public class AdminDubboApiV3Impl implements AdminDubboApi {
@Autowired
private OrderDubboApi orderDubboApi;
@Override
public int averageOrderAmount() {
// org.apache.dubbo.rpc.RpcContext
RpcContext context = RpcContext.getContext();
// 异步请求下游接口或许订单总数
CompletableFuture<Integer> orderCountFuture = context.asyncCall(orderDubboApi::countOrderToday);
// 异步请求下游接口获取订单总额
CompletableFuture<Integer> orderAmountSumFuture = context.asyncCall(orderDubboApi::sumOrderAmountToday);
// 将上述两个异步结果归并成一个Future
CompletableFuture<Void> compositeFuture = CompletableFuture.allOf(orderCountFuture, orderAmountSumFuture);
AsyncContext asyncContext = context.startAsync(); // 通知dubbo,我开始异步处理了,这个方法的返回值忽略
compositeFuture.whenComplete((v, t) -> {
if (t != null) {
asyncContext.write(t);// 通知Dubbo请求处理完了,可以把结果返回给客户端了
return;
}
try {
// 代码进行到此处时,说明上述两个异步任务在1s内完成,已经有结果了
Integer orderCount = orderCountFuture.get(); // 此处不会阻塞
Integer orderAmountSum = orderAmountSumFuture.get(); // 此处不会阻塞
int result = orderAmountSum / orderCount;// 业务逻辑处理
asyncContext.write(result); // 通知Dubbo请求处理完了,可以把结果返回给客户端了
} catch (Exception e) {
asyncContext.write(e);
}
});
return -1; // 结果会被忽略
}
}
不需要改变方法签名,无需调用方(上游)感知、修改调用方式,即可对上游透明的实现服务端非阻塞的逻辑处理。
org.apache.dubbo.rpc.RpcContext#startAsync
org.apache.dubbo.rpc.AsyncContextImpl#write
异步结果编排
当请求过程和响应过程都实现异步化以后,业务代码里就充斥了大量的Future、Callback,很大程度上的增加了编码难度。需要熟练掌握CompletableFuture各个组合、编排API。
大家可以熟悉下响应式流(Reactive Stream),例如RxJava、ReactorProject等。利用其丰富的异步事件组合、编排的算子,把业务代码中异步逻辑编排得更优雅、简洁。后续基架这边也会看看做下相关的分享。
总结
如果一个应用的入站请求是Dubbo、出站请求也是Dubbo,那按上述思路处理后,整个请求处理流程中的RPC相关的网络阻塞都会被消除,但是要做到应用内代码完全的非阻塞、响应式、线程利用率提升,还需要解除其他的阻塞。
比如redis、rmdb等,当然需要评估下非阻塞带来的编码复杂度进行ROI取舍。
非阻塞的、事件驱动的编程范式可以提高线程的利用率。利用事件驱动将阻塞消除后,少量线程即可实现高吞吐的请求处理。目前广泛使用的网络IO模型已经是非阻塞、事件驱动了,只是上层业务代码还是使用同步、阻塞的编程范式。
C/S架构下,如果客户端请求方式、服务端的响应方式都匹配了底层的IO模型,消除了阻塞,那整个系统的性能、资源消耗是可以有很大提升的~