当前位置首页 > 爱问问> 正文

讲解java多线程共享数据 实现java线程通信的几种方式

2021-10-21 02:30:45 爱问问

开发中不免会遇到需要所有子线程执行完毕通知主线程处理某些逻辑的场景。

或者是线程 A 在执行到某个条件通知线程 B 执行某个操作。

可以通过以下几种方式实现:

等待通知机制

等待通知模式是 Java 中比较经典的线程通信方式。

两个线程通过对同一对象调用等待 wait() 和通知 notify() 方法来进行通讯。

如两个线程交替打印奇偶数:

public class TwoThreadWaitNotify {      private int start = 1;      private boolean flag = false;      public static void main(String[] args) {          TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify();          Thread t1 = new Thread(new OuNum(twoThread));          t1.setName("A");          Thread t2 = new Thread(new JiNum(twoThread));          t2.setName("B");          t1.start();          t2.start();      }      /**       * 偶数线程       */      public static class OuNum implements Runnable {          private TwoThreadWaitNotify number;          public OuNum(TwoThreadWaitNotify number) {              this.number = number;          }          @Override          public void run() {              while (number.start <= 100) {                  synchronized (TwoThreadWaitNotify.class) {                      System.out.println("偶数线程抢到锁了");                      if (number.flag) {                          System.out.println(Thread.currentThread().getName() + "+-+偶数" + number.start);                          number.start++;                          number.flag = false;                          TwoThreadWaitNotify.class.notify();                      }else {                          try {                              TwoThreadWaitNotify.class.wait();                          } catch (InterruptedException e) {                              e.printStackTrace();                          }                      }                  }              }          }      }      /**       * 奇数线程       */      public static class JiNum implements Runnable {          private TwoThreadWaitNotify number;          public JiNum(TwoThreadWaitNotify number) {              this.number = number;          }          @Override          public void run() {              while (number.start <= 100) {                  synchronized (TwoThreadWaitNotify.class) {                      System.out.println("奇数线程抢到锁了");                      if (!number.flag) {                          System.out.println(Thread.currentThread().getName() + "+-+奇数" + number.start);                          number.start++;                          number.flag = true;                          TwoThreadWaitNotify.class.notify();                      }else {                          try {                              TwoThreadWaitNotify.class.wait();                          } catch (InterruptedException e) {                              e.printStackTrace();                          }                      }                  }              }          }      }  }

输出结果:

t2+-+奇数93  t1+-+偶数94  t2+-+奇数95  t1+-+偶数96  t2+-+奇数97  t1+-+偶数98  t2+-+奇数99  t1+-+偶数100

这里的线程 A 和线程 B 都对同一个对象 TwoThreadWaitNotify.class 获取锁,A 线程调用了同步对象的 wait() 方法释放了锁并进入 WAITING 状态。

B 线程调用了 notify() 方法,这样 A 线程收到通知之后就可以从 wait() 方法中返回。

这里利用了 TwoThreadWaitNotify.class 对象完成了通信。

有一些需要注意:

  • wait() 、notify()、notifyAll() 调用的前提都是获得了对象的锁(也可称为对象监视器)。

  • 调用 wait() 方法后线程会释放锁,进入 WAITING 状态,该线程也会被移动到等待队列中。

  • 调用 notify() 方法会将等待队列中的线程移动到同步队列中,线程状态也会更新为 BLOCKED

  • 从 wait() 方法返回的前提是调用 notify() 方法的线程释放锁,wait() 方法的线程获得锁。

等待通知有着一个经典范式:

线程 A 作为消费者:

  1. 获取对象的锁。

  2. 进入 while(判断条件),并调用 wait() 方法。

  3. 当条件满足跳出循环执行具体处理逻辑。

线程 B 作为生产者:

  1. 获取对象锁。

  2. 更改与线程 A 共用的判断条件。

  3. 调用 notify() 方法。

伪代码如下:

//Thread A  synchronized(Object){      while(条件){          Object.wait();      }      //do something  }  //Thread B  synchronized(Object){      条件=false;//改变条件      Object.notify();  }

join() 方法

    private static void join() throws InterruptedException {          Thread t1 = new Thread(new Runnable() {              @Override              public void run() {                  LOGGER.info("running");                  try {                      Thread.sleep(3000);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }          }) ;          Thread t2 = new Thread(new Runnable() {              @Override              public void run() {                  LOGGER.info("running2");                  try {                      Thread.sleep(4000);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }          }) ;          t1.start();          t2.start();          //等待线程1终止          t1.join();          //等待线程2终止          t2.join();          LOGGER.info("main over");      }

输出结果:

2018-03-16 20:21:30.967 [Thread-1] INFO  c.c.actual.ThreadCommunication - running2  2018-03-16 20:21:30.967 [Thread-0] INFO  c.c.actual.ThreadCommunication - running  2018-03-16 20:21:34.972 [main] INFO  c.c.actual.ThreadCommunication - main over

在 t1.join() 时会一直阻塞到 t1 执行完毕,所以最终主线程会等待 t1 和 t2 线程执行完毕。

其实从源码可以看出,join() 也是利用的等待通知机制:

核心逻辑:

    while (isAlive()) {          wait(0);      }

在 join 线程完成后会调用 notifyAll() 方法,是在 JVM 实现中调用,所以这里看不出来。

volatile 共享内存

因为 Java 是采用共享内存的方式进行线程通信的,所以可以采用以下方式用主线程关闭 A 线程:

public class Volatile implements Runnable{      private static volatile boolean flag = true ;      @Override      public void run() {          while (flag){              System.out.println(Thread.currentThread().getName() + "正在运行。。。");          }          System.out.println(Thread.currentThread().getName() +"执行完毕");      }      public static void main(String[] args) throws InterruptedException {          Volatile aVolatile = new Volatile();          new Thread(aVolatile,"thread A").start();          System.out.println("main 线程正在运行") ;          TimeUnit.MILLISECONDS.sleep(100) ;          aVolatile.stopThread();      }      private void stopThread(){          flag = false ;      }  }

输出结果:

thread A正在运行。。。  thread A正在运行。。。  thread A正在运行。。。  thread A正在运行。。。  thread A执行完毕

这里的 flag 存放于主内存中,所以主线程和线程 A 都可以看到。

flag 采用 volatile 修饰主要是为了内存可见性,更多内容可以查看这里。

CountDownLatch 并发工具

CountDownLatch 可以实现 join 相同的功能,但是更加的灵活。

    private static void countDownLatch() throws Exception{          int thread = 3 ;          long start = System.currentTimeMillis();          final CountDownLatch countDown = new CountDownLatch(thread);          for (int i= 0 ;i<thread ; i++){              new Thread(new Runnable() {                  @Override                  public void run() {                      LOGGER.info("thread run");                      try {                          Thread.sleep(2000);                          countDown.countDown();                          LOGGER.info("thread end");                      } catch (InterruptedException e) {                          e.printStackTrace();                      }                  }              }).start();          }          countDown.await();          long stop = System.currentTimeMillis();          LOGGER.info("main over total time={}",stop-start);      }

输出结果:

2018-03-16 20:19:44.126 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread run  2018-03-16 20:19:44.126 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread run  2018-03-16 20:19:44.126 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread run  2018-03-16 20:19:46.136 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread end  2018-03-16 20:19:46.136 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread end  2018-03-16 20:19:46.136 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread end  2018-03-16 20:19:46.136 [main] INFO  c.c.actual.ThreadCommunication - main over total time=2012

CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 实现的,更多实现参考 ReentrantLock 实现原理

  • 初始化一个 CountDownLatch 时告诉并发的线程,然后在每个线程处理完毕之后调用 countDown() 方法。

  • 该方法会将 AQS 内置的一个 state 状态 -1 。

  • 最终在主线程调用 await() 方法,它会阻塞直到 state == 0 的时候返回。

CyclicBarrier 并发工具

    private static void cyclicBarrier() throws Exception {          CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ;          new Thread(new Runnable() {              @Override              public void run() {                  LOGGER.info("thread run");                  try {                      cyclicBarrier.await() ;                  } catch (Exception e) {                      e.printStackTrace();                  }                  LOGGER.info("thread end do something");              }          }).start();          new Thread(new Runnable() {              @Override              public void run() {                  LOGGER.info("thread run");                  try {                      cyclicBarrier.await() ;                  } catch (Exception e) {                      e.printStackTrace();                  }                  LOGGER.info("thread end do something");              }          }).start();          new Thread(new Runnable() {              @Override              public void run() {                  LOGGER.info("thread run");                  try {                      Thread.sleep(5000);                      cyclicBarrier.await() ;                  } catch (Exception e) {                      e.printStackTrace();                  }                  LOGGER.info("thread end do something");              }          }).start();          LOGGER.info("main thread");      }        


声明:此文信息来源于网络,登载此文只为提供信息参考,并不用于任何商业目的。如有侵权,请及时联系我们:baisebaisebaise@yeah.net