Java知识分享
热爱技术,分享技术

多线程设计模式之:Actvice Object设计模式

1. 主动对象模式

ActiveObjects主动对象模式的意思拥有独立的线程,还可以接收异步消息。并且能够返回处理结果。

在java虚拟机中,我们显示调用system.gc(),它就是一个接收异步消息的主动对象。调用GC方法的线程和gc自身的执行线程并不是同一个线程,在这篇文章中,我们将实现一个类似于System.gc的可以接受异步消息的主动对象。

多线程设计模式之:Actvice Object设计模式插图
接受异步消息的主动对象的工作原理

2. 标准ActiveObject模式设计。

将一个接口的方法调用转换成可接受异步消息的主动对象,也是就是方法的执行和方法的调用是在不同的线程种进行的,那么如何使得执行线程指导该如何正确的执行接口方法呢?我们需要将接口方法的参数以及具体的实现封装成特定的Message告知执行线程。如果该接口方法需要返回值,则必须设计成Future的返回形式。

多线程设计模式之:Actvice Object设计模式插图(1)

2.1 OrderService接口设计

package cn.hackcloud.concurrency.activeobject;
import cn.hackcloud.concurrency.future.Future;
public interface OrderService {
    /**
     * 根据订单编号查询订单明细,有入参也有返回值。
     * 但是返回类型必须是Future
     */
    Future<String> findOrderDetails(long orderId);

    /**
     * 提交订单,没有返回值
     */
    void order(String account, long orderId);

}
  • findOrderDetails ( long orderId):通过订单编号获取订单详情,有返回值的方法必须是Future类型的,因为方法的执行是在其他线程中进行的,势必不会立即得到正确的最终结果,通过Future可以立即得到返回。
  • Order ( String account,long orderId):提交用户的订单信息,是一种无返回值的方法。
package cn.hackcloud.concurrency.activeobject;

import cn.hackcloud.concurrency.future.Future;
import cn.hackcloud.concurrency.future.FutureService;

import java.util.concurrent.TimeUnit;

public class OrderServiceImpl implements OrderService {
@Override
public Future<String> findOrderDetails(long orderId) {
return FutureService.<Long, String>newService().submit(input -> {
try {
TimeUnit.SECONDS.sleep(10);
System.out.println("process the orderId->" + orderId);
} catch (Exception e) {
e.printStackTrace();

}
return "the order Details information";
}, orderId, null);
}

@Override
public void order(String account, long orderId) {
try {
TimeUnit.SECONDS.sleep(10);
System.out.println("process the order for account " + account + " orderId->" + orderId);
} catch (Exception e) {
e.printStackTrace();

}
}
}

OrderServiceImpl类是OrderService 的-一个具体实现,该类是在执行线程中要被使用的类,其中findOrderDetails方法通过https://laijun.vip/java/concurrency-215.html我们开发的Future立即返回一个结果,order方法则通过休眠来模拟该方法的执行比较耗时。

2.2 OrderServiceProxy详解

OrderServiceProxy是OrderService的子类,它的作用是将OrderService的每一个方法都封装成MethodMessage,然后提交给ActiveMesage队列,在使用OrderService接口方法的时候,实际上是在调用OrderServiceProxy 中的方法。

package cn.hackcloud.concurrency.activeobject;

import cn.hackcloud.concurrency.future.Future;

import java.util.HashMap;
import java.util.Map;

public class OrderServiceProxy implements OrderService {
private final OrderService orderService;
private final ActiveMessageQueue activeMessageQueue;

public OrderServiceProxy(OrderService orderService,ActiveMessageQueue activeMessageQueue) {
this.orderService = orderService;
this.activeMessageQueue=activeMessageQueue;
}

@Override
public Future<String> findOrderDetails(long orderId) {
//定义一个ActiveFuture,并且可支持立即返回
final ActiveFuture<String> activeFuture=new ActiveFuture<>();
//组装入参
Map<String,Object> params=new HashMap<>();
params.put("orderId",orderId);
params.put("activeFuture",activeFuture);
//把入参和返回的结果封装成MethodMessage
MethodMessage message=new FindOrderDetailsMessage(params,orderService);
//将MethodMessage保存至ActiveMessageQueue队列中
activeMessageQueue.offer(message);
return activeFuture;
}

@Override
public void order(String account, long orderId) {
//把入参和返回的结果封装成MethodMessage,然后offer至队列中
Map<String,Object> params=new HashMap<>();
params.put("account",account);
params.put("orderId",orderId);
MethodMessage message=new FindOrderDetailsMessage(params,orderService);
activeMessageQueue.offer(message);
}
}

OrderServiceProxy作为OrderService 的一个实现,看上去与OrderService没多大关系,其主要作用是将OrderService接口定义的方法封装成MethodMessage,然后offer 给ActiveMessageQueue。若是无返回值的方法,则只需要提交Message到ActiveMessageQueue中即可,但若是有返回值的方法,findOrderDetails 是比较特殊的,它需要返回一个ActiveFuture,该Future的作用是可以立即返回,当调用线程获取结果时将进人阻塞状态。

public class ActiveFuture<T> extends FutureTask<T> {
@Override
protected void finish(T result) {
super.finish(result);
}
}

ActiveFuture非常简单,是FutureTask 的直接子类,其主要作用是重写finish 方法,并且将protected 的权限换成public,可以使得执行线程完成任务之后传递最终结果。

2.3 MethodMessage

MethodMessage的主要作用是收集每一个接口的方法参数,并且提供execute方法供ActiveDaemonThread直接调用,该对象就是典型的Worker Thread模型中的Product (附有使用说明书的半成品,等待流水线工人的加工),execute 方法则是加工该产品的说明书。MethodMessage的代码如下。

public abstract class MethodMessage {
protected final Map<String, Object> params;
protected final OrderService orderService;

public MethodMessage(Map<String, Object> params, OrderService orderService) {
this.params = params;
this.orderService = orderService;
}

public abstract void execute();
}

其中,params主要用来收集方法参数, orderService 是具体的接口实现,每一个方法都会被拆分成不同的Message。在OrderService中,我们定义了两个方法,因此需要实现两个MethodMessage。

public class FindOrderDetailsMessage extends MethodMessage {
    public FindOrderDetailsMessage(Map<String, Object> params, OrderService orderService) {
        super(params, orderService);
    }

    @Override
    public void execute() {
        //1
        Future<String> realFuture = orderService.findOrderDetails((Long) params.get("orderId"));
        ActiveFuture<String> activeFuture = (ActiveFuture<String>) params.get("activeFuture");
        final String result;
        try {
            //2
            result = realFuture.get();
            //3
            activeFuture.finish(result);
        } catch (InterruptedException e) {
            activeFuture.finish(null);
        }

    }
}

在上述代码中:
①执行orderService的findOrderDetails方法。
②调用orderServiceImpl返回的Future.get(),此方法会导致阻塞直到findOrderDetails方法完全执行结束。
③当findOrderDetails执行结束时,将结果通过finish的方法传递activeFuture。


public class OderMessage extends MethodMessage {
public OderMessage(Map<String, Object> params, OrderService orderService) {
super(params, orderService);
}

@Override
public void execute() throws InterruptedException {
String account = (String) params.get("account");
Long orderId = (Long) params.get("orderId");
orderService.order(account, orderId);
}
}

OrderMessage主要处理order方法,从param中获取接口参数,然后执行真正的
OrderService的order方法。

2.4 ActiveMessageQueue

ActiveMessageQueue对应于Worker-Thread 模式中的传送带,主要用于传送调用线程通过Proxy提交过来的MethodMessage,但是这个传送带允许存放无限的MethodMessage(没有limit的约束,理论.上可以放无限多个MethodMessage直到发生堆内存溢出的异常)

  • 在创建ActiveMessageQueue的同时启动ActiveDaemonThread线程,ActiveDaemonThread主要用来进行异步的方法执行,后面我们会介绍。
  • 执行offer方法没有进行limit的判断,允许提交无限个MethodMessage (直到发生堆内存溢出),并且当有新的Message加入时会通知ActiveDaemonThread线程。
  • take 方法主要是被ActiveDaemonThread线程使用,当message队列为空时ActiveDaemonThread线程将会被挂起(Guarded Suspension)
/**
* ActiveDaemonThread是一个守护线程,主要是从queue中获取Message然后执行
* execute方法(注意:保持为线程命名的习惯是一个比较好的编程习惯)
*/
public class ActiveDaemonThread extends Thread {
private final ActiveMessageQueue queue;

public ActiveDaemonThread(ActiveMessageQueue queue) {
super("ActiveDaemonThread");
this.queue = queue;
setDaemon(true);
}

@Override
public void run() {
for (; ; ) {
MethodMessage methodMessage = this.queue.take();
methodMessage.execute();
}
}
}

我们基本上已经完成了一个标准Active Objects 的设计,接口方法的每一-次调用实际上都是向Queue中提交-一个对应的Message信息,当然这个工作主要是由Proxy完成的,但是为了让Proxy的构造透明化,我们需要设计一个Factory工具类,代码如下。

    private final static ActiveMessageQueue MESSAGE_QUEUE = new ActiveMessageQueue();

private OrderServiceFactory() {

}

public static OrderService toActiveObject(OrderService orderService) {
return new OrderServiceProxy(orderService, MESSAGE_QUEUE);
}
}
public static void main(String[] args) throws InterruptedException {
OrderService orderService = OrderServiceFactory.toActiveObject(new OrderServiceImpl());
orderService.order("hello", 10000);
System.out.println("Return immediately.");
Thread.currentThread().join();
}

运行上面的测试代码会立即得到返回,10 秒之后,order 方法执行结束,调用order方法的线程是主线程,但是执行该方法的线程却是其他线程( ActiveDaemonThread),这也正是ActiveObjects 可接受异步消息的意思。

打赏
本站所有资源均来源于网络,仅供学习使用,请支持正版!Java技术开源 » 多线程设计模式之:Actvice Object设计模式

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册