本文共 11791 字,大约阅读时间需要 39 分钟。
API文档是这样介绍的:一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此使CyclicBarrier很有用。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。 通俗点讲:让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有别屏障拦截的线程才会干活。 类图:/** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties) { this(parties, null); }
Parties:表示拦截线程的数量。
barrierAction:为CyclicBarrier接受的Runnable命令,用于线程到达屏障时,优先执行barrierAction,用于处理复杂的业务场景. <br/>现在我们来看看CyclicBarrier最重要的函数await(): 代码:await()方法: public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }dowait()方法: private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { //分代 final Generation g = generation; //当前generation“已损坏”,抛出BrokenBarrierException异常 //抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrie if (g.broken) //当某个线程试图等待处于断开状态的 barrier 时,或者 barrier 进入断开状态而线程处于等待状态时,抛出该异常 throw new BrokenBarrierException(); //如果线程中断,终止CyclicBarrier if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //进来一个线程 count - 1 int index = --count; //count == 0 表示所有线程均已到位,触发Runnable任务 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; //触发任务 if (command != null) command.run(); ranAction = true; //唤醒所有等待线程,并更新generation nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { //如果不是超时等待,则调用Condition.await()方法等待 if (!timed) trip.await(); else if (nanos > 0L) //超时等待,调用Condition.awaitNanos()方法等待 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); //generation已经更新,返回index if (g != generation) return index; //“超时等待”,并且时间已到,终止CyclicBarrier,并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //释放锁 lock.unlock(); } }
我们再看看jdkAPI文档描述:
public int await() throws InterruptedException, BrokenBarrierException在所有 参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态:最后一个线程到达;或者其他某个线程中断当前线程;或者其他某个线程中断另一个等待线程;或者其他某个线程在等待 barrier 时超时;或者其他某个线程在此 barrier 上调用 reset()。如果当前线程:在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断则抛出 InterruptedException,并且清除当前线程的已中断状态。如果在线程处于等待状态时 barrier 被 reset(),或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出 BrokenBarrierException 异常。如果任何线程在等待时被 中断,则其他所有等待线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作,则在允许其他线程继续运行之前,当前线程将运行该操作。如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。返回:到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,零指示最后一个到达的线程抛出:InterruptedException - 如果当前线程在等待时被中断BrokenBarrierException - 如果 另一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者在调用 await 时 barrier 被损坏,抑或由于异常而导致屏障操作(如果存在)失败。
示例:
public class CyclicBarrierTest { public static CyclicBarrier cyclicBarrier; public static class WorkThread extends Thread { public void run() { try { System.out.println(Thread.currentThread().getName()+"已进入"); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName()+"离开"); } catch (InterruptedException | BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static void main(String[] args) { cyclicBarrier=new CyclicBarrier(4,new Runnable() { public void run() { System.out.println("人已经到齐"); } }); for(int i=0;i<4;i++) { new WorkThread().start(); } }}运行结果: Thread-0已进入Thread-2已进入Thread-1已进入Thread-3已进入人已经到齐Thread-3离开Thread-0离开Thread-1离开Thread-2离开
看一下JDK文档的描述:
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。
构造函数
/** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
从上述可以看出:count :在线程能通过 await() 之前,必须调用 countDown() 的次数
说到这我们来看看CountDownLatch中的连个方法:countDown()和await(); await()方法:public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
描述:
使当前线程在锁存器倒计数至零之前一直等待,除非线程被 中断。如果当前计数为零,则此方法立即返回。如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下两种情况之一前,该线程将一直处于休眠状态:
由于调用 countDown() 方法,计数到达零;或者
其他某个线程中断当前线程。如果当前线程:在进入此方法时已经设置了该线程的中断状态;或者
在等待时被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。countDown()方法:
public void countDown() { sync.releaseShared(1); }
描述:递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
如果当前计数大于零,则将计数减少。如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程。如果当前计数等于零,则不发生任何操作。 示例:public class CountDownLatchTest { private static class WorkThread extends Thread { private CountDownLatch cdl; private int sleepSecond; public WorkThread(String name, CountDownLatch cdl, int sleepSecond) { super(name); this.cdl = cdl; this.sleepSecond = sleepSecond; } public void run() { try { System.out.println(this.getName() + "启动了,时间为" + System.currentTimeMillis()); Thread.sleep(sleepSecond * 1000); cdl.countDown(); System.out.println(this.getName() + "执行完了,时间为" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } } private static class DoneThread extends Thread { private CountDownLatch cdl; public DoneThread(String name, CountDownLatch cdl) { super(name); this.cdl = cdl; } public void run() { try { System.out.println(this.getName() + "要等待了, 时间为" + System.currentTimeMillis()); cdl.await(); System.out.println(this.getName() + "等待完了, 时间为" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { CountDownLatch cdl = new CountDownLatch(3); DoneThread dt0 = new DoneThread("DoneThread1", cdl); DoneThread dt1 = new DoneThread("DoneThread2", cdl); dt0.start(); dt1.start(); WorkThread wt0 = new WorkThread("WorkThread1", cdl, 2); WorkThread wt1 = new WorkThread("WorkThread2", cdl, 3); WorkThread wt2 = new WorkThread("WorkThread3", cdl, 4); wt0.start(); wt1.start(); wt2.start(); }}运行结果:DoneThread2要等待了, 时间为1529917959491DoneThread1要等待了, 时间为1529917959491WorkThread1启动了,时间为1529917959492WorkThread2启动了,时间为1529917959492WorkThread3启动了,时间为1529917959492WorkThread1执行完了,时间为1529917961493WorkThread2执行完了,时间为1529917962492WorkThread3执行完了,时间为1529917963492DoneThread1等待完了, 时间为1529917963492DoneThread2等待完了, 时间为1529917963492
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。
转载于:https://blog.51cto.com/12666319/2132554