Java知识分享
热爱技术,分享技术

并发编程实战:手写之Future设计模式

假设有个任务需要执行比较长的时间,通常需要等待任务执行结束或者出错了返回结果,在此期间调用者只能阻塞等待返回,因此Futrue设计模式提供了一种凭据式的解决方案。

自JDK1.5起,Java提供了比较强大的Future接口,在JDK1.8时更是引入了CompletableFuture,结合函数式接口可以实现更强大的功能。

2.Future设计模式实现

并发编程实战:手写之Future设计模式插图

Future设计模式类图

2.1 接口定义

2.1.1 Future接口设计

package com.ibfbc.concurrency.future;

/**
 * Future接口定义
 * Created by www.ibfbc.com
 */
public interface Future<T> {
    //返回计算结果,该方法会陷入阻塞状态
    T get() throws InterruptedException;

    //判断任务是否已经被执行完成
    boolean done();
}

2.1.2 FutureService接口设计

FutureService主要是提交任务,提交任务的主要有两种,第一种不需要返回值,

第二种则需要获得最终的计算结果。FutureService接口中提供了对FutureServiceImpl构建的工厂方法,JDK8不仅支持default方法还支持静态方法。

2.1.3 Task接口设计

Task接口主要提供给调用者实现计算逻辑。可以接受一个参数并且返回结果。类似于Callable接口。

package com.ibfbc.concurrency.future;
/**
 * 用于调用者实现计算逻辑
 * Created by www.ibfbc.com
 */
@FunctionalInterface
public interface Task<IN, OUT> {
    //给定一个参数,计算后返回结果
    OUT get(IN input);
}

3.程序实现

3.1 FutureTask

FutureTask是Future的一个实现,除了实现Future中定义的get方法以及done方法之外,额外增加了finish方法,该方法主要用于接收任务被完成的通知。

package com.ibfbc.concurrency.future;

/**
 * 实现Future
 * Created by www.ibfbc.com
 */
public class FutureTask<T> implements Future<T> {
    //计算结果
    private T result;
    //任务是否完成
    private boolean isDone = false;
    //定义对象锁
    private final Object LOCK = new Object();

    /**
     * 任务还没完成时,调用get方法将会被阻塞
     * @return T
     * @throws InterruptedException
     */
    @Override
    public T get() throws InterruptedException {
        synchronized (LOCK) {

            while (!isDone) {
                LOCK.wait();
            }
        }
        return result;
    }

    /**
     * 计算完成时为result指定结果
     * 并且将isDone设置为true
     * 同时唤醒阻塞中的线程
     * @param result
     */
    protected void finish(T result) {
        synchronized (LOCK) {
            if (isDone) {
                return;
            }
            this.result = result;
            this.isDone = true;
            LOCK.notifyAll();
        }
    }

    @Override
    public boolean done() {
        return isDone;
    }
}

FutureTask中充分利用了线程间的通讯wait和notifyAll,当任务还没有被完成之前调用get方法将会被阻塞。直到任务完成并接收到其他线程的唤醒信号。finish方法接收到任务完成的通知,唤醒了调用get方法被阻塞的线程。

3.2 FutureServiceImpl

package com.ibfbc.concurrency.future;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 当提交任务时创建一个新的线程来受理该任务
 * 进而达到任务异步执行的结果
 * Created by www.ibfbc.com
 */
public class FutureServiceImpl<IN, OUT> implements FutureService<IN, OUT> {
    //为线程名指定前缀,这是一个好习惯
    private final static String FUTURE_THREAD_PREFIX = "FUTURE-";

    private final AtomicInteger nextCounter = new AtomicInteger(0);

    private String getNextName() {
        return FUTURE_THREAD_PREFIX + nextCounter.incrementAndGet();
    }

    /**
     * 任务执行结束后、将null结果传递给future
     */
    @Override
    public Future<?> submit(Runnable runnable) {
        final FutureTask<Void> future = new FutureTask<>();
        new Thread(() -> {
            runnable.run();
            future.finish(null);
        }, getNextName()).start();
        return future;
    }

    /**
     * 任务执行结束后、将真实的结果通过finish方法传递给future
     */
    @Override
    public Future<OUT> submit(Task<IN, OUT> task, IN input) {
        final FutureTask<OUT> future = new FutureTask<>();
        new Thread(() -> {
            OUT result = task.get(input);
            future.finish(result);
        }, getNextName()).start();
        return future;
    }

    @Override
    public Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback) {
        final FutureTask<OUT> future = new FutureTask<>();
        new Thread(() -> {
            OUT result = task.get(input);
            future.finish(result);
            //执行回调接口
            if (null != callback) {
                callback.call(result);
            }
        }, getNextName()).start();
        return future;
    }
}

在FutureServiceImpl的submit方法中,分别启动了新的线程运行任务,起到了异步的作用,在任务最终运行成功之后,会通知FutureTask任务已完成。

4.Future的使用以及技巧总结

Future主要是将一些耗时的操作交给一个线程去执行,从而达到异步的目的,提交线程在提交任务和获得计算结果的过程中可以进行其他的任务执行而不是傻傻的等待结果的返回。

4.1. 无返回值的测试

FutureService<Void, Void> service = FutureService.newService();service.submit(() -> {    
try {        
TimeUnit.SECONDS.sleep(3);   
 } catch (InterruptedException e) {   
     e.printStackTrace();    } 
   System.out.println("I am finish done.");});

4.2. 有返回值的测试

FutureService<String, Integer> service = FutureService.newService();Future<Integer> future = service.submit(input -> {    return input.length();}, "Hello");System.out.println(future.get());

5. 增强FutureService支持回调

使用任务完成时回调的机制可以让调用者不再显示的用过get的方式获得数据而导致进入阻塞,可以在提交任务的时候将回调接口一并注入。


@Override 
public Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback) {
        final FutureTask<OUT> future = new FutureTask<>();
        new Thread(() -> {
            OUT result = task.get(input);
            future.finish(result);
            //执行回调接口
            if (null != callback) {
                callback.call(result);
            }
        }, getNextName()).start();
        return future;
    }

该方法新增了一个callback参数,主要用来接受并处理任务的计算结果,当提交的任务执行完成之后,将结果传递给Callback接口进行进一步的执行。这在提交任务之后不再会因为通过get方法获得结果而陷入阻塞。

/**
* 任务完成后会调用该方法
* Created by www.ibfbc.com
*/
@FunctionalInterface
public interface Callback<T> {
    void call(T t);
}

测试增加callback之后的测试代码

public static void main(String[] args) throws InterruptedException {
        FutureService<String, Integer> service = FutureService.newService();
        service.submit(input -> input.length(), "Hello", result -> System.out.println(result));

    }
打赏
本站所有资源均来源于网络,仅供学习使用,请支持正版!Java技术开源 » 并发编程实战:手写之Future设计模式

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册