Java知识分享
热爱技术,分享技术

Dubbo服务提供者异步执行

1. 基于定义CompletableFuture签名的接口实现异步执行

在provider模块中,基于CompletableFuture签名接口实现异步执行的接口实现类为UserServiceImpl,代码如下:

public class UserServiceImpl implements IUserService {
    private final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES, new SynchronousQueue<>(),
            new NamedThreadFactory("biz"), new ThreadPoolExecutor.CallerRunsPolicy());
    static List<UserAddress> list = new ArrayList<>();

    static {
        UserAddress address1 = new UserAddress(1, "重庆", "1", "李老师", "010-56253825", "Y");
        UserAddress address2 = new UserAddress(2, "成都", "2", "王老师", "010-56253825", "N");

        list.add(address1);
        list.add(address2);
    }

    /**
     * 基于CompletableFuture的签名的接口实现异步执行
     *
     * @param userId
     * @return
     */
    public CompletableFuture<List<UserAddress>> async2(String userId) {
        return CompletableFuture.supplyAsync(() -> list.stream().filter(x -> x.getUserId().equals(userId)).collect(Collectors.toList()), poolExecutor);
    }

}

通过上面的代码我们可以了解到,基于定义CompletableFuture签名接口实现异步执行需要接口方法的返回值为CompletableFuture并且方法内部使用CompletableFuture.supplyAsync让本该由Dubbo内部线程池中的线程处理的服务,转换为由业务线程来处理。该方法会马上返回CompletableFuture对象(所以Dubbo内部线程池中的线程会得到及时释放),传递的业务函数则由业务线程池执行。

消费者调用代码:

public static void asyncCallBackNew() {
ReferenceConfig<IUserService> reference = referenceConfig();
//设置为异步
reference.setAsync(true);
IUserService userService = reference.get();
userService.async2("1").whenComplete((v, t) -> {
print(v);
});
}

调用userService.async()方法直接放了CompletableFuture对象。

2. 使用AsyncContext实现异步执行

/**
* asyncContext实现异步执行
*
* @param userId
* @return
*/

@Override
public List<UserAddress> async(String userId) {
final AsyncContext asyncContext = RpcContext.startAsync();
poolExecutor.execute(() -> {
asyncContext.signalContextSwitch();
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
}
asyncContext.write(list.stream().filter(x -> x.getUserId().equals(userId)).collect(Collectors.toList()));
});
return new ArrayList<>();
// throw new RuntimeException("Exception to show hystrix enabled.");
}

上面代码中使用RpcContext.startAsync方法开启异步执行,返回一个AsyncContext,然后把服务处理任务提交到业务线程池后 async方法直接返回null,异步任务首先执行asyncContext.signalContextSwitch()切换任务的上下文,然后休眠4秒钟延迟任务执行,最后asyncContext.write方法把任务结果写入异步上下文。

打赏
本站所有资源均来源于网络,仅供学习使用,请支持正版!Java技术开源 » Dubbo服务提供者异步执行

评论 1

评论前必须登录!

 

  1. #1

    666

    laijun1个月前 (06-06)

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册