博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
J.U.C工具类中的CountDownLatch和CyclicBarrier
阅读量:7152 次
发布时间:2019-06-29

本文共 11791 字,大约阅读时间需要 39 分钟。

讲解CyclicBarrier

      API文档是这样介绍的:一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此使CyclicBarrier很有用。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。

      CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。
    通俗点讲:让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有别屏障拦截的线程才会干活。
类图:
image
上图可知:CyclicBarrier的内部是使用重入锁ReetrantLock和Condition。
构造函数
image
源码

/**     * Creates a new {@code CyclicBarrier} that will trip when the     * given number of parties (threads) are waiting upon it, and which     * will execute the given barrier action when the barrier is tripped,     * performed by the last thread entering the barrier.     *     * @param parties the number of threads that must invoke {@link #await}     *        before the barrier is tripped     * @param barrierAction the command to execute when the barrier is     *        tripped, or {@code null} if there is no action     * @throws IllegalArgumentException if {@code parties} is less than 1     */    public CyclicBarrier(int parties, Runnable barrierAction) {        if (parties <= 0) throw new IllegalArgumentException();        this.parties = parties;        this.count = parties;        this.barrierCommand = barrierAction;    }    /**     * Creates a new {@code CyclicBarrier} that will trip when the     * given number of parties (threads) are waiting upon it, and     * does not perform a predefined action when the barrier is tripped.     *     * @param parties the number of threads that must invoke {@link #await}     *        before the barrier is tripped     * @throws IllegalArgumentException if {@code parties} is less than 1     */    public CyclicBarrier(int parties) {        this(parties, null);    }

Parties:表示拦截线程的数量。

barrierAction:为CyclicBarrier接受的Runnable命令,用于线程到达屏障时,优先执行barrierAction,用于处理复杂的业务场景.
<br/>
现在我们来看看CyclicBarrier最重要的函数await():
代码:

await()方法:  public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }dowait()方法:   private int dowait(boolean timed, long nanos)            throws InterruptedException, BrokenBarrierException,            TimeoutException {        //获取锁        final ReentrantLock lock = this.lock;        lock.lock();        try {            //分代            final Generation g = generation;            //当前generation“已损坏”,抛出BrokenBarrierException异常            //抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrie            if (g.broken)                //当某个线程试图等待处于断开状态的 barrier 时,或者 barrier 进入断开状态而线程处于等待状态时,抛出该异常                throw new BrokenBarrierException();            //如果线程中断,终止CyclicBarrier            if (Thread.interrupted()) {                breakBarrier();                throw new InterruptedException();            }            //进来一个线程 count - 1            int index = --count;            //count == 0 表示所有线程均已到位,触发Runnable任务            if (index == 0) {  // tripped                boolean ranAction = false;                try {                    final Runnable command = barrierCommand;                    //触发任务                    if (command != null)                        command.run();                    ranAction = true;                    //唤醒所有等待线程,并更新generation                    nextGeneration();                    return 0;                } finally {                    if (!ranAction)                        breakBarrier();                }            }            for (;;) {                try {                    //如果不是超时等待,则调用Condition.await()方法等待                    if (!timed)                        trip.await();                    else if (nanos > 0L)                        //超时等待,调用Condition.awaitNanos()方法等待                        nanos = trip.awaitNanos(nanos);                } catch (InterruptedException ie) {                    if (g == generation && ! g.broken) {                        breakBarrier();                        throw ie;                    } else {                        // We're about to finish waiting even if we had not                        // been interrupted, so this interrupt is deemed to                        // "belong" to subsequent execution.                        Thread.currentThread().interrupt();                    }                }                if (g.broken)                    throw new BrokenBarrierException();                //generation已经更新,返回index                if (g != generation)                    return index;                //“超时等待”,并且时间已到,终止CyclicBarrier,并抛出异常                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            //释放锁            lock.unlock();        }    }

我们再看看jdkAPI文档描述:

public int await()          throws InterruptedException,                 BrokenBarrierException在所有 参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态:最后一个线程到达;或者其他某个线程中断当前线程;或者其他某个线程中断另一个等待线程;或者其他某个线程在等待 barrier 时超时;或者其他某个线程在此 barrier 上调用 reset()。如果当前线程:在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断则抛出 InterruptedException,并且清除当前线程的已中断状态。如果在线程处于等待状态时 barrier 被 reset(),或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出 BrokenBarrierException 异常。如果任何线程在等待时被 中断,则其他所有等待线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作,则在允许其他线程继续运行之前,当前线程将运行该操作。如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。返回:到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,零指示最后一个到达的线程抛出:InterruptedException - 如果当前线程在等待时被中断BrokenBarrierException - 如果 另一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者在调用 await 时 barrier 被损坏,抑或由于异常而导致屏障操作(如果存在)失败。

示例:

public class CyclicBarrierTest {    public static CyclicBarrier cyclicBarrier;    public static class WorkThread extends Thread    {        public void run()        {            try {                System.out.println(Thread.currentThread().getName()+"已进入");                cyclicBarrier.await();                System.out.println(Thread.currentThread().getName()+"离开");            } catch (InterruptedException | BrokenBarrierException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }    }    public static void main(String[] args) {        cyclicBarrier=new CyclicBarrier(4,new Runnable() {            public void run()            {                System.out.println("人已经到齐");            }        });        for(int i=0;i<4;i++)        {            new WorkThread().start();        }    }}运行结果: Thread-0已进入Thread-2已进入Thread-1已进入Thread-3已进入人已经到齐Thread-3离开Thread-0离开Thread-1离开Thread-2离开

讲解CountDownLatch

看一下JDK文档的描述:

  •     一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

  • 用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。

  • CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

  • CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await。

构造函数

/**     * Constructs a {@code CountDownLatch} initialized with the given count.     *     * @param count the number of times {@link #countDown} must be invoked     *        before threads can pass through {@link #await}     * @throws IllegalArgumentException if {@code count} is negative     */    public CountDownLatch(int count) {        if (count < 0) throw new IllegalArgumentException("count < 0");        this.sync = new Sync(count);    }

从上述可以看出:count :在线程能通过 await() 之前,必须调用 countDown() 的次数

说到这我们来看看CountDownLatch中的连个方法:countDown()和await();
await()方法:

public void await() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    } public final void acquireSharedInterruptibly(int arg)            throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        if (tryAcquireShared(arg) < 0)            doAcquireSharedInterruptibly(arg);    }       private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return;                    }                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();            }        } finally {            if (failed)                cancelAcquire(node);        }    }

描述:

使当前线程在锁存器倒计数至零之前一直等待,除非线程被 中断。
如果当前计数为零,则此方法立即返回。

如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下两种情况之一前,该线程将一直处于休眠状态:

由于调用 countDown() 方法,计数到达零;或者

其他某个线程中断当前线程。
如果当前线程:

在进入此方法时已经设置了该线程的中断状态;或者

在等待时被中断,
则抛出 InterruptedException,并且清除当前线程的已中断状态。

countDown()方法:

public void countDown() {        sync.releaseShared(1);    }

描述:递减锁存器的计数,如果计数到达零,则释放所有等待的线程。

如果当前计数大于零,则将计数减少。如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程。
如果当前计数等于零,则不发生任何操作。
示例:

public class CountDownLatchTest {    private static class WorkThread extends Thread    {        private CountDownLatch cdl;        private int sleepSecond;        public WorkThread(String name, CountDownLatch cdl, int sleepSecond)        {            super(name);            this.cdl = cdl;            this.sleepSecond = sleepSecond;        }        public void run()        {            try            {                System.out.println(this.getName() + "启动了,时间为" + System.currentTimeMillis());                Thread.sleep(sleepSecond * 1000);                cdl.countDown();                System.out.println(this.getName() + "执行完了,时间为" + System.currentTimeMillis());            }             catch (InterruptedException e)            {                e.printStackTrace();            }        }    }    private static class DoneThread extends Thread    {        private CountDownLatch cdl;        public DoneThread(String name, CountDownLatch cdl)        {            super(name);            this.cdl = cdl;        }        public void run()        {            try            {                System.out.println(this.getName() + "要等待了, 时间为" + System.currentTimeMillis());                cdl.await();                System.out.println(this.getName() + "等待完了, 时间为" + System.currentTimeMillis());            }             catch (InterruptedException e)            {                e.printStackTrace();            }        }    }    public static void main(String[] args) throws Exception    {        CountDownLatch cdl = new CountDownLatch(3);        DoneThread dt0 = new DoneThread("DoneThread1", cdl);        DoneThread dt1 = new DoneThread("DoneThread2", cdl);        dt0.start();        dt1.start();        WorkThread wt0 = new WorkThread("WorkThread1", cdl, 2);        WorkThread wt1 = new WorkThread("WorkThread2", cdl, 3);        WorkThread wt2 = new WorkThread("WorkThread3", cdl, 4);        wt0.start();        wt1.start();        wt2.start();    }}运行结果:DoneThread2要等待了, 时间为1529917959491DoneThread1要等待了, 时间为1529917959491WorkThread1启动了,时间为1529917959492WorkThread2启动了,时间为1529917959492WorkThread3启动了,时间为1529917959492WorkThread1执行完了,时间为1529917961493WorkThread2执行完了,时间为1529917962492WorkThread3执行完了,时间为1529917963492DoneThread1等待完了, 时间为1529917963492DoneThread2等待完了, 时间为1529917963492

两者之间的比较

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。

转载于:https://blog.51cto.com/12666319/2132554

你可能感兴趣的文章
Maven项目环境搭建实例.
查看>>
Atitit.信息论原理概论attilax总结
查看>>
Openfire 的安装和配置
查看>>
好,开始没做出来 guess-number-higher-or-lower-ii
查看>>
[LeetCode] Find Right Interval 找右区间
查看>>
android View 绘制完成监听
查看>>
igbinary vs serialize vs json_encode
查看>>
禅与摩托车维修的艺术
查看>>
TestNG的简单使用
查看>>
数组可以容纳多少水----------给你出道题
查看>>
Linux之sar命令介绍
查看>>
飘逸的python - 增强的格式化字符串format函数
查看>>
android发送短信样例
查看>>
1044 拦截导弹 1999年NOIP全国联赛提高组 个人博客:attack.cf
查看>>
著名的英文搜索引擎
查看>>
linux中的strip命令简介------给文件脱衣服【转】
查看>>
算法笔记_184:历届试题 约数倍数选卡片(Java)
查看>>
分压、分流原理
查看>>
C# 读取EXCEL文件的三种经典方法
查看>>
SQL学习之联结表的使用
查看>>