博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
从CompletableFuture学习Dubbo 2.7.x 全链路异步
阅读量:5931 次
发布时间:2019-06-19

本文共 7472 字,大约阅读时间需要 24 分钟。

CompletableFuture学习的小例子

CompletableFuture
objectCompletableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "1"; });// objectCompletableFuture.thenApply(r -> {
// System.out.println(r);// return "2";// });// objectCompletableFuture.thenApply(r -> {
// System.out.println(r);// return "2";// }); objectCompletableFuture.thenApply(r -> { System.out.println(r); return "3"; }).whenComplete((result, t) -> { System.out.println(result); }); Thread.sleep(20000L);复制代码

任务执行

下面的分析都是假设CompletableFuture线程未执行完的情况

调用CompletableFuture#supplyAsync方法

public static  CompletableFuture supplyAsync(Supplier supplier) {   	return asyncSupplyStage(asyncPool, supplier); // 默认线程池}static  CompletableFuture asyncSupplyStage(Executor e,                                             Supplier f) { if (f == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture();// new一个CompletableFuture e.execute(new AsyncSupply(d, f));// 执行一个新线程 return d;}static final class AsyncSupply
extends ForkJoinTask
implements Runnable, AsynchronousCompletionTask { CompletableFuture
dep; Supplier
fn; AsyncSupply(CompletableFuture
dep, Supplier
fn) { this.dep = dep; this.fn = fn; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) {} public final boolean exec() { run(); return true; } public void run() { CompletableFuture
d; Supplier
f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { d.completeValue(f.get()); // completeValue方法会把f的执行结果赋值到CompletableFuture#result } catch (Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); // 通知完成(先跳过这步,假设在上边sleep了) } } }复制代码

假设run方法没有跑完,则CompletableFuture#supplyAsync方法直接放回一个新CompletableFuture对象。

此时调用CompletableFuture#thenApply方法。

public  CompletableFuture thenApply(    Function
fn) { return uniApplyStage(null, fn);}private
CompletableFuture
uniApplyStage( Executor e, Function
f) { if (f == null) throw new NullPointerException(); CompletableFuture
d = new CompletableFuture
(); // 又new一个CompletableFuture if (e != null || !d.uniApply(this, f, null)) { // e默认为空,后面的uniApply会判断this(也就是supplyAsync方法new的第一个CompletableFuture)是否执行完(result是否null),为空就走这个if UniApply
c = new UniApply
(e, d, this, f);// new一个UniApply对象,把this和新new的CompletableFuture封装一下 push(c);// push到this的stack中 c.tryFire(SYNC); } return d;// 返回新new的CompletableFuture}复制代码

此时的supplyAsync返回的CompletableFuture结构。CompletableFuture#stack存了封装了两个CompletableFuture对象的UniApply对象。

类图只是为了展示结构,不标准的哈

thenApply返回新new的CompletableFuture,再调用CompletableFuture#whenComplete方法。和CompletableFuture#supplyAsync方法类似,只是封装类变成了UniWhenComplete。

public CompletableFuture
whenComplete( BiConsumer
action) { return uniWhenCompleteStage(null, action);}private CompletableFuture
uniWhenCompleteStage( Executor e, BiConsumer
f) { if (f == null) throw new NullPointerException(); CompletableFuture
d = new CompletableFuture
(); if (e != null || !d.uniWhenComplete(this, f, null)) { UniWhenComplete
c = new UniWhenComplete
(e, d, this, f); push(c); c.tryFire(SYNC); } return d;}复制代码

类图只是为了展示结构,不标准的哈

现在把文章开头的注释打开,这样会把新new的CompletableFuture的封装类添加到CompletableFuture.Completion#next中,变成一个链表。
next存储的是使用同一个对象执行thenApply等方法形成的链表,而dep存储的是使用每个thenApply等方法返回的CompletableFuture形成的stack?(看起来像链表)。

异步任务执行结果回调

回到CompletableFuture#supplyAsync方法execute的Runnable,当任务执行完会调用CompletableFuture#postComplete方法。

这个方法逻辑比较绕,不想看可以跳过,功能是执行上面添加的所有的lambda回调

final void postComplete() {    /*     * 在每个步骤中,变量f将当前依赖项保存为弹出和运行。 它一次只沿一条路径扩展,推动其他路径以避免无限制的递归。     */    CompletableFuture
f = this; Completion h; while ((h = f.stack) != null || // h临时存放了this stack (f != this && (h = (f = this).stack) != null)) { CompletableFuture
d; Completion t; if (f.casStack(h, t = h.next)) { // 将next链表cas到stack中 if (t != null) { if (f != this) { // 如果f不等于this,则将添加到当前的stack中 pushStack(h); // 这样操作会使得,之前多级结构,变成同一个Stack continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; // tryFire会执行上述添加的所有lambda回调 } }}复制代码

Dubbo 2.7.x 全链路异步

NettyServerHandler#channelRead方法,Netty IO线程接收到请求。

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);    try {        handler.received(channel, msg);    } finally {        NettyChannel.removeChannelIfDisconnected(ctx.channel());    }}复制代码

经过几层调用后会调用到AllChannelHandler#received方法。会把请求分发到Dubbo内部的Executor。直接返回释放Netty的IO线程。

public void received(Channel channel, Object message) throws RemotingException {    ExecutorService executor = getExecutorService();    try {        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));    } catch (Throwable t) {        // 异常处理,省略    }}复制代码

dubbo内部线程执行后,再经过几层调用后会调用HeaderExchangeHandler#handleRequest方法。

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {    Response res = new Response(req.getId(), req.getVersion());    /**     * 参数校验,省略     */    Object msg = req.getData();    try {        // handle data.        CompletableFuture future = handler.reply(channel, msg); // 最终会调用自己实现的Service        if (future.isDone()) {            res.setStatus(Response.OK);            res.setResult(future.get());            channel.send(res);            return;        }        future.whenComplete((result, t) -> {            try {                if (t == null) {                    res.setStatus(Response.OK);                    res.setResult(result);                } else {                    res.setStatus(Response.SERVICE_ERROR);                    res.setErrorMessage(StringUtils.toString(t));                }                channel.send(res);            } catch (RemotingException e) {                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);            } finally {                // HeaderExchangeChannel.removeChannelIfDisconnected(channel);            }        });    } catch (Throwable e) {        res.setStatus(Response.SERVICE_ERROR);        res.setErrorMessage(StringUtils.toString(e));        channel.send(res);    }}复制代码

如果Service实现了CompletableFuture,则可以把业务处理放到业务线程,释放掉Dubbo线程。

public class AsyncServiceImpl implements AsyncService {    @Override    public CompletableFuture
sayHello(String name) { RpcContext savedContext = RpcContext.getContext(); // 建议为supplyAsync提供自定义线程池,避免使用JDK公用线程池 return CompletableFuture.supplyAsync(() -> { System.out.println(savedContext.getAttachment("consumer-key1")); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "async response from provider."; }); }}复制代码

优秀!~

转载地址:http://rnytx.baihongyu.com/

你可能感兴趣的文章
React Native开发技术
查看>>
“聊天剽窃手”--ptrace进程注入型病毒
查看>>
vue-cli笔记
查看>>
JavaEE学习总结(十四)— 人工智能微博
查看>>
HTML的前世今生
查看>>
工厂模式
查看>>
flexbox父盒子justify-content属性
查看>>
逆向脱壳附加数据处理
查看>>
阿里云启用IPV6
查看>>
intelliJ 打包jar的多种方式
查看>>
ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES)
查看>>
Oracle RAC 实验环境RMAN备份v1.01
查看>>
ControlTemplate in WPF —— ItemsControl
查看>>
把表单转成json,并且name为key,value为值
查看>>
kotlin for android----------MVP模式实现登录
查看>>
.net FrameWork各个版本之间的发展[转]
查看>>
织梦CMS搭建网站必做的服务器相关安全设置
查看>>
张高兴的 Windows 10 IoT 开发笔记:BH1750FVI 光照度传感器
查看>>
SQL Server 服务器主体拥有一个或多个端点无法删除;错误15141
查看>>
Linux-echo、cat命令详解(14)
查看>>