Java知识分享
热爱技术,分享技术

并发编程实战:Latch设计模式

1. 什么是Latch设计模式

比如若干线程并发执行某个特定的任务,然后等到所有的子任务都执行结束之后再统一汇总,比如用户想要查询自己三年以来银行账号的流水,为了保证运行数据库的数据量在-一个恒定的范围之内,通常数据只会保存一年的记录,其他的历史记录或被备份到磁盘,或者被存储于hive数据仓库,或者被转存至备份数据库之中,总之想要三年的流水记录,需要若千个渠道的查询才可以汇齐。
如果一个线程负责执行这样的任务,则需要经历若干次的查询最后汇总返回给用户,很明显这样的操作性能低下,用户体验差,如果我们将每一个渠道的查询交给-一个线程或者若千个线程去查询,然后统一汇总, 那么性能会提高很多,响应时间也会缩短不少。

并发编程实战:Latch设计模式插图

2. CountDownLatch程序实现

2.1 无限等待的Latch

首先定义了一个无限等待的抽象类Latch,在Latch抽象类中定义了await 方法、countDown方法以及getUnarrived 方法,这些方法的用途在代码注释中都有
详细介绍,当然在Latch中的limit 属性至关重要,当limit降低到0时门阀将会被打开。

package com.ibfbc.concurrency.lach;
/**
 * Created by www.ibfbc.com
 */
public abstract class Latch {
    //用于控制多少个线程完成任务时才能打开阀门
    protected int limit;

    public Latch(int limit) {
        this.limit = limit;
    }

    //该方法会使当前线程一直等待,直到所有线程都完成工作。被阻塞的线程是允许被中断的
    public abstract void await() throws InterruptedException;

    //当任务线程完成后调用该方法使计数器减一
    public abstract void countDown();

    //获取当前还有多少线程没有完成任务
    public abstract int getActive();
}

子任务数量达到limit的时候,门阀才能打开, await()方法用于等待所有的子任务
完成,如果到达数量未达到limit的时候,将会无限等待下去,当子任务完成的时候调用countDown()方法使计数器减少-一个,表明我已经完成任务了,getActive() 方法主要用于查询当前有多少个子任务还未结束。

2.2 无限等待的CountDownLatch实现

下面来实现一个无限制等待门阀打开的Latch实现,当limit >0时调用await方法的线程将会进入无限的等待。

package com.ibfbc.concurrency.lach;
/**
 * 实现
 * Created by www.ibfbc.com
 */
public class CountDownLatch extends Latch {
    public CountDownLatch(int limit) {
        super(limit);
    }

    @Override
    public void await() throws InterruptedException {
        synchronized (this) {
            //当limit>0时,当前线程进入阻塞
            while (limit > 0) {
                this.wait();
            }
        }
    }
    @Override
    public void countDown() {
        synchronized (this) {
            if (limit <= 0) {
                throw new IllegalStateException("");
            }
            //使计数器-1,并且通知阻塞线程
            limit--;
            this.notifyAll();
        }
    }

    @Override
    public int getActive() {
        return limit;
    }
}

在上述代码中,await() 方法不断判断limit的数量,大于0时门阀将不能打开,需要持续等待直到limit数量为0为止; countDown) 方法调用之后会导致limit–操作,并且通知wait中的线程再次判断limit的值是否等于0,当limit被减少到了0以下,则抛出状态非法的异常; getActive() 获取当前还有多少个子任务未完成,这个返回值并不一定就是准确的,在多线程的情况下,某个线程在获得Unarived任务数量并且返回之后,有可能limit又被减少,因此getActive()是一个评估值。

2.3 有超时时间的Latch

public abstract void await(TimeUnit timeUnit, long times) throws InterruptedException;

其中TimeUnit代表wait的时间单位,而time则是指定数量的时间单位,在该方法
中又增加了WaitTimeoutException 用于通知当前的等待已经超时。

@Override
public void await(TimeUnit timeUnit, long times) throws InterruptedException, TimeoutException {
    if (times <= 0)
        throw new IllegalArgumentException("the time is invalid.");
    //转为纳秒
    long remainingNanos = timeUnit.toNanos(times);
    final long endNanos = System.nanoTime() + remainingNanos;
    synchronized (this) {
        while (limit > 0) {
            //超时则异常
            if (TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0) {
                throw new TimeoutException("wait time out.");
            }
            //等待remainingNanos,在等待过程中可能被中断,需要重新计算等待remainingNanos
            this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
            remainingNanos = endNanos - System.nanoTime();
        }
    }

}

为了方便计算,我们将所有的时间单位都换算成了纳秒,但是Object的wait方法只能够接受毫秒,因此该方法还涉及了时间的换算,另外如果等待剩余时间不足1毫秒,那么将会抛出TimeoutException 异常通知等待者。

3. 总结

Latch设计模式提供了等待所有子任务完成,然后继续接下来工作的一种设计方法, 自JDK1.5起也提供了CountDownLatch的工具类,其作用与我们创建的并无两样,无论是我们开发的CountDownLatch还是JDK所提供的,当await超时的时候,已完成任务的线程自然正常结束,但是未完成的则不会被中断还会继续执行下去,也就是说CountDownLatch只提供了门阀的功能,并不负责对线程的管理控制,对线程的控制还需要程序员自己控制。

Latch的作用是为了等待所有子任务完成后再执行其他任务,因此可以对Latch进行再次的扩展,增加回调接口用于运行所有子任务完成后的其他任务,增加了回调功能的CountDownLatch代码如下:

public CountDownLatch(int limit, Runnable runnable) {
super(limit);
this.runnable = runnable;
}
public void await() throws InterruptedException {
synchronized (this) {
//当limit>0时,当前线程进入阻塞
while (limit > 0) {
this.wait();
}
}
if (null != runnable) {
runnable.run();
}
}
打赏
本站所有资源均来源于网络,仅供学习使用,请支持正版!Java技术开源 » 并发编程实战:Latch设计模式

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册