Java知识分享
热爱技术,分享技术

监控任务的生命周期

1. 场景描述

虽然Thread为我们提供了可获取状态,以及判断是否alive 的方法,但是这些方法均 是针对线程本身的,而我们提交的任务Runnable在运行过程中所处的状态如何是无法直接 获得的,比如它什么时候开始,什么时候结束,最不好的一-种体验是无法获得Runnable任 务执行后的结果。一般情况下想要获得最终结果,我们不得不为Thread或者Runnable传入 共享变量,但是在多线程的情况下,共享变量将导致资源的竞争从而增加了数据不一致性 的安全隐患。

2. 当观察者模式遇到Thread

当某个对象发生状态改变需要通知第三方的时候,观察者模式就特别适合胜任这样的 工作。观察者模式需要有事件源,也就是引发状态改变的源头,很明显Thread负责执行任 务的逻辑单元,它最清楚整个过程的始末周期,而事件的接收者则是通知接受者一方,严 格意义上的观察者模式是需要Observer的集合的,我们在这里不需要完全遵守这样的规则, 只需将执行任务的每一个阶段都通知给观察者即可。

3. 接口定义

3.1 Observable接口定义

/**
 * 并发编程网
 * www.ibfbc.com
 */
public interface Observable {
    /**
     * 线程的生命周期
     */
    enum Cycle {
        STARTED, RUNNING, DONE, ERROR
    }

    /**
     * 获取当前任务的生命周期状态
     */
    Cycle getCycle();

    /**
     * 定义启动线程的方法、主要是为了屏蔽Thread其他方法
     */
    void start();

    /**
     * 定义线程的中断方法
     */
    void interrupt();
}
该接口主要是暴露给调用者使用的,其中四个枚举类型分别代表了当前任务执行生命 周期的各个阶段,具体如下。
  • getCycle()方法用于获取当前任务处于哪个执行阶段。
  • start() 方法的目的主要是为了屏蔽Thread类其他的API,可通过Observable的start对线程进行启动。
  • interrupt() 方法的作用与start一样,可通过Observable的interrupt对当前线程进行中断。

3.2 TaskLifeCycle

/**
* 并发编程网
* www.ibfbc.com
*/
public interface TaskLifeCycle<T> {
/**
* 任务启动时触发
*/
void onStart(Thread thread);

/**
* 任务运行时触发
*/
void onRunning(Thread thread);

/**
* 任务结束时触发,result是任务执行结束后的结果
*/
void onFinish(Thread thread,T result);

/**
* 任务异常时触发
*/
void onError(Thread thread, Exception e);
}

TaskLifecycle接口定义了在任务执行的生命周期中会被触发的接口,其中Empty- Lifycycle是-一个空的实现,主要是为了让使用者保持对Thread类的使用习惯。

  • onStart (Thread thread) 当任务开始执行时会被回调的方法。
  • onRunning(Threadthread)任务运行时被回调的方法,由于我们针对的是任务的生 命周期,不同于线程生命周期中的RUNNING状态,如果当前线程进人了休眠或者阻塞,那么任务都是running状态。
  • onFinish ( Thread thread, T result)任务正确执行结束后会被回调,其中result是任务执行后的结果,可允许为null。
  • onError ( Thread thread, Exceptione) 任务在运行过程中出现任何异常抛出时,onError方法都将被回调,并将异常信息一并传人。

3.3 Task函数接口定义

public interface Task<T> {
    /**
     * 任务执行接口,允许有返回值
     * @return
     */
    T call();
}

由于我们需要对线程中的任务执行增加可观察的能力,并且需要获得最后的计算结 果,因此Runnable接口在可观察的线程中将不再使用,取而代之的是Task接口,其作用与 Runnable类似,主要用于承载任务的逻辑执行单元。

监控任务的生命周期插图

3.4 ObservableThread实现

public class ObservableThread<T> extends Thread implements Observable {
private final TaskLifeCycle<T> lifeCycle;
private final Task<T> task;
private Cycle cycle;

/**
* 指定Task的实现,默认EmptyLifeCycle
*/
public ObservableThread(Task<T> task) {
this(new EmptyLifeCycle<>(), task);
}

/**
* 指定TaskLifeCycle的实现,默认EmptyLifeCycle
*/
public ObservableThread(TaskLifeCycle<T> lifeCycle, Task<T> task) {
Assert.notNull(task, "the task is required.");
this.lifeCycle = lifeCycle;
this.task = task;
}

@Override
public final void run() {
//在执行线程逻辑单元的时候,分别触发响应的事件
this.update(Cycle.STARTED, null, null);
try {
this.update(Cycle.RUNNING, null, null);
T result = this.task.call();
this.update(Cycle.DONE, result, null);
} catch (Exception e) {
this.update(Cycle.ERROR, null, e);
}
}

private void update(Cycle cycle, T result, Exception e) {
this.cycle = cycle;
if (lifeCycle == null) {
return;
}
Thread thread = Thread.currentThread();
try {
switch (cycle) {
case STARTED:
this.lifeCycle.onStart(thread);
break;
case RUNNING:
this.lifeCycle.onRunning(thread);
break;
case DONE:
this.lifeCycle.onFinish(thread, result);
break;
case ERROR:
this.lifeCycle.onError(thread, e);
break;
}
} catch (Exception ex) {
if (cycle == Cycle.ERROR) {
throw ex;
}
}
}

@Override
public Cycle getCycle() {
return this.cycle;
}

重写父类的run方法,并且将其修饰为final 类型,不允许子类再次对其进行重写,run .方法在线程的运行期间,可监控任务在执行过程中的各个生命周期阶段,任务每经过一个阶段相当于发生了一次事件。

update方法用于通知时间的监听者,此时任务在执行过程中发生了什么,最主要的通知是异常的处理。如果监听者也就是TaskLifecycle,在响应某个事件的过程中出现了意外,则会导致任务的正常执行受到影响,因此需要进行异常捕获,并忽略这些异常信息以保证TaskLifecycle的实现不影响任务的正确执行,但是如果任务执行过程中出现错误并且抛出

重写父类的run方法,并且将其修饰为final 类型,不允许子类再次对其进行重写,run .方法在线程的运行期间,可监控任务在执行过程中的各个生命周期阶段,任务每经过一个阶段相当于发生了一次事件。.
update方法用于通知时间的监听者,此时任务在执行过程中发生了什么,最主要的通知是异常的处理。如果监听者也就是TaskLifecycle,在响应某个事件的过程中出现了意外,则会导致任务的正常执行受到影响,因此需要进行异常捕获,并忽略这些异常信息以保证TaskLifecycle的实现不影响任务的正确执行,但是如果任务执行过程中出现错误并且抛出了异常,那么update方法就不能忽略该异常,需要继续抛出异常,保持与call方法同样的意图。

4. 总结

public class ObservableTest {
    public static void main(String[] args) {
        Observable observable = new ObservableThread<>(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "HELLO WORLD";
        });
        observable.start();
    }
  • 在接口Observable中定义与Thread同样的方法用于屏蔽Thread的其他API,在使用的过程中使用Observable声明ObservableThread的类型,如果使用者还想知道更 多的关于Thread的API,只需要在Observable接口中增加即可。
  • 将ObservableThread中的run方法修饰为final,或者将ObservableThread类修饰为final,防止子类继承重写,导致整个生命周期的监控失效,我们都知道,任务的逻 辑执行单元是存在于run方法之中的,而在ObservableThread中我们摒弃了这一点, 让它专门监控业务执行单元的生命周期,而将真正的业务逻辑执行单元交给了一个 可返回计算结果的接口Task。
  • ObservableThread本身的run方法充当了事件源的发起者,而TaskLifecycle 则扮演了事件回调的响应者。

打赏
本站所有资源均来源于网络,仅供学习使用,请支持正版!Java技术开源 » 监控任务的生命周期

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册