JAVA-Thread-Base-4

Java 线程基础 4

1 线程间的通信

1.1 等待/通知机制

1.1.1 什么是等待通知机制

在单线程编程中,要执行的操作需要满足一定的条件才能执行,可以把这个操作放在 if 语句块中.

在多线程编程中,可能 A 线程的条件没有满足只是暂时的, 稍后其他的线程 B 可能会更新条件使得 A 线程的条件得到满足. 可以将 A 线程暂停,直到它的条件得到满足后再将 A 线程唤醒.它的伪代码:

atomics{ //原子操作
    while( 条件不成立 ){
        等待
    }
    当前线程被唤醒条件满足后,继续执行下面的操作
}

1.1.2 等待/通知机制的实现

Object 类中的 wait()方法可以使执行当前代码的线程等待,暂停执行,直到接到通知或被中断为止.

注意:

  1. wait()方法只能 在同步代码块中由锁对象调用
  2. 调用 wait()方法,当前线程会释放锁

其伪代码如下:

//在调用 wait()方法前获得对象的内部锁
synchronized( 锁对象 ){
    while( 条件不成立 ){
        //通过锁对象调用 wait()方法暂停线程,会释放锁对象
        锁对象.wait();
    }
    //线程的条件满足了继续向下执行
}

Object 类的 notify()可以唤醒线程,该方法也必须在同步代码块中由 锁 对 象 调 用 . 没 有 使 用 锁 对 象 调 用 wait()/notify() 会 抛 出IlegalMonitorStateExeption 异常. 如果有多个等待的线程,notify()方法只能唤醒其中的一个. 在同步代码块中调用 notify()方法后,并不会立即释放锁对象,需要等当前同步代码块执行完后才会释放锁对象,一般将 notify()方法放在同步代码块的最后.

它的伪代码如下:

synchronized( 锁对象 ){
    //执行修改保护条件 的代码
    //唤醒其他线程
    锁对象.notify();
}

package com.sevattal.wait;
/**
 * 需要通过 notify()唤醒等待的线程
 * sevattal
 */
public class Test03 {
    public static void main(String[] args) throws InterruptedException {
        String lock = "sevattal"; //定义一个字符串作为锁对象
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println("线程 1 开始等待: " + System.currentTimeMillis());
                    try {
                        lock.wait(); //线程等待,会释放锁对象,当前线程转入blocked 阻塞状态
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程 1 结束等待:" + System.currentTimeMillis());
                }
            }
        });
        //定义第二个线程,在第二个线程中唤醒第一个线程
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                //notify()方法也需要在同步代码块中,由锁对象调用
                synchronized (lock){
                    System.out.println("线程 2 开始唤醒 : " + System.currentTimeMillis());
                    lock.notify(); //唤醒在 lock 锁对象上等待的某一个线程
                    System.out.println("线程 2 结束唤醒 : " + System.currentTimeMillis());
                }
            }
        });
        t1.start(); //开启 t1 线程,t1 线程等待
        Thread.sleep(3000); //main 线程睡眠 3 秒,确保 t1 入睡
        t2.start(); //t1 线程开启 3 秒后,再开启 t2 线程唤醒 t1 线程
    }
}

1.1.3 interrupt()方法会中断 wait()

当线程处于 wait()等待状态时, 调用线程对象的 interrupt()方法会中断线程的等待状态, 会产生 InterruptedException 异常

package com.sevattal.wait;
/**
 * Interrupt()会中断线程的 wait()等待
 * sevattal
 */
public class Test05 {
    public static void main(String[] args) throws InterruptedException {
        SubThread t = new SubThread();
        t.start();
        Thread.sleep(2000); //主线程睡眠 2 秒, 确保子线程处于 Wait 等待状态
        t.interrupt();
    }
    private static final Object LOCK = new Object(); //定义常量作为锁对象
    static class SubThread extends Thread{
        @Override
        public void run() {
            synchronized (LOCK){
                try {
                    System.out.println("begin wait...");
                    LOCK.wait();
                    System.out.println("end wait..");
                } catch (InterruptedException e) {
                    System.out.println("wait 等待被中断了****");
                }
            }
        }
    }
}

1.1.4 notify()与 notifyAll()

notify()一次只能唤醒一个线程,如果有多个等待的线程,只能随机唤醒其中的某一个; 想要唤醒所有等待线程,需要调用 notifyAll()

package com.sevattal.wait;
/**
 * notify()与 notifyAll() 
 * sevattal
 */
public class Test06 {
    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object(); //定义一个对象作为子线程的锁对象
        SubThread t1 = new SubThread(lock);
        SubThread t2 = new SubThread(lock);
        SubThread t3 = new SubThread(lock);
        t1.setName("t1");
        t2.setName("t2");
        t3.setName("t3");
        t1.start();
        t2.start();
        t3.start();
        Thread.sleep(2000);
        //调用 notify()唤醒 子线程
        synchronized (lock){
            // lock.notify(); 
            // 调用一次 notify()只能唤醒其中的一个线程,其他等待的线程依然处于等待状态,对于处于等待状态的线程来说,错过了通知信号,这种现象也称为信号丢失
            lock.notifyAll(); //唤醒所有的线程
        }
    }
    static class SubThread extends Thread{
        private Object lock; //定义实例变量作为锁对象
        public SubThread(Object lock) {
            this.lock = lock;
        }
        @Override
        public void run() {
            synchronized (lock){
                try {
                    System.out.println(Thread.currentThread().getName() + " -- begin wait...");
                    lock.wait();
                    System.out.println( Thread.currentThread().getName() + " -- end wait...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

1.1.5 wait(long)的使用

wait(long)带有 long 类型参数的 wait()等待,如果在参数指定的时间内没有被唤醒,超时后会自动唤醒.

package com.sevattal.wait;
/**
 * wait(long)
 * sevattal
 */
public class Test07 {
    public static void main(String[] args) {
        final Object obj = new Object();
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized ( obj ){
                    try {
                        System.out.println("thread begin wait");
                        obj.wait(5000); //如果 5000 毫秒内没有被唤醒 ,会自动唤醒
                        System.out.println("end wait....");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        t.start();
    }
}

1.1.6 通知过早

线程 wait()等待后,可以调用 notify()唤醒线程, 如果 notify()唤醒的过早,在等待之前就调用了 notify()可能会打乱程序正常的运行逻辑.

package com.sevattal.wait;
/**
 * notify()通知过早
 * sevattal
 */
public class Test08 {
    public static void main(String[] args) {
        final Object Lock = new Object(); //定义对象作为锁对象
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock){
                    try {
                        System.out.println("begin wait");
                        Lock.wait();
                        System.out.println("wait end...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock){
                    System.out.println("begin notify");
                    Lock.notify();;
                    System.out.println("end nofity");
                }
            }
        });
        //如果先开启 t1,再开启 t2 线程,大多数情况下, t1 先等待,t1 再把 t1 唤醒
        // t1.start();
        // t2.start();
        //如果先开启 t2 通知线程,再开启 t1 等待线程,可能会出现 t1 线程等待没有收到通知的情况,t2.start();
        t1.start();
    }
}

package com.sevattal.wait;
/**
 * notify()通知过早, 就不让线程等待了
 * sevattal
 */
public class Test09 {
    static boolean isFirst = true; //定义静态变量作为是否第一个运行的线程标志
    public static void main(String[] args) {
        final Object Lock = new Object(); //定义对象作为锁对象
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock){
                    while ( isFirst ) { //当线程是第一个开启的线程就等待
                        try {
                            System.out.println("begin wait");
                            Lock.wait();
                            System.out.println("wait end...");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock){
                    System.out.println("begin notify");
                    Lock.notify();;
                    System.out.println("end nofity");
                    isFirst = false; //通知后,就把第一个线程标志修改为 false
                }
            }
        });
        //如果先开启 t1,再开启 t2 线程,大多数情况下, t1 先等待,t1 再把 t1 唤醒
        // t1.start();
        // t2.start();
        //如果先开启 t2 通知线程,再开启 t1 等待线程,可能会出现 t1 线程等待没有收到通知的情况,t2.start();
        t1.start();
        //实际上,调用 start()就是告诉线程调度器,当前线程准备就绪,线程调度器在什么时
        //候开启这个线程不确定,即调用 start()方法的顺序,并不一定就是线程实际开启的顺序.
        // 在当前示例中,t1 等待后让 t2 线程唤醒 , 如果 t2 线程先唤醒了,就不让 t1 线程等待了
    }
}

1.1.7 wait 等待条件发生了变化

在使用 wait/nofity 模式时,注意 wait 条件发生了变化,也可能会造成逻辑的混乱

package com.sevattal.wait;
import java.util.ArrayList;
import java.util.List;
/**
 * wait 条件发生变化
 * 定义一个集合
 * 定义一个线程向集合中添加数据,添加完数据后通知另外的线程从集合中取数据
 * 定义一个线程从集合中取数据,如果集合中没有数据就等待
 * sevattal
 */
public class Test10 {
    public static void main(String[] args) {
        //定义添加数据的线程对象
        ThreadAdd threadAdd = new ThreadAdd();
        //定义取数据的线程对象
        ThreadSubtract threadSubtract = new ThreadSubtract();
        threadSubtract.setName("subtract 1 ");
        //测试一: 先开启添加数据的线程,再开启一个取数据的线程,大多数情况下会正常取数据
        // threadAdd.start();
        // threadSubtract.start();
        //测试二: 先开启取数据的线程,再开启添加数据的线程, 取数据的线程会先等待, 等到添加数据之后 ,再取数据
        // threadSubtract.start();
        // threadAdd.start();
        //测试三: 开启两个 取数据的线程,再开启添加数据的线程
        ThreadSubtract threadSubtract2 = new ThreadSubtract();
        threadSubtract2.setName("subtract 2 ");
        threadSubtract.start();
        threadSubtract2.start();
        threadAdd.start();
    /* 某一次执行结果如下:
     * subtract 1 begin wait.... subtract 2 从集合中取了 data 后,集合中数据的数量:0
     * subtract 1 end wait.. Exception in thread "subtract 1 " java.lang.IndexOutOfBoundsException:
     * 分析可能的执行顺序:
     * threadSubtract 线程先启动, 取数据时,集合中没有数据,wait()等待
     * threadAdd 线程获得 CPU 执行权, 添加数据 , 把 threadSubtract 线程唤醒, threadSubtract2 线程开启后获得 CPU 执行权, 正常取数据
     * threadSubtract 线程获得 CPU 执行权, 打印 end wait..., 然后再执行
     * list.remove(0) 取 数 据 时 , 现 在 list 集 合 中 已 经 没 有 数 据 了 , 这 时 会 产 生
     * java.lang.IndexOutOfBoundsException 异常
     * 出现异常的原因是: 向 list 集合中添加了一个数据,remove()了两次
     * 如何解决?
     * 当等待的线程被唤醒后, 再判断一次集合中是否有数据可取. 即需要把
     * sutract()方法中的 if 判断改为 while
     * */
    }
    //1)定义 List 集合
    static List list = new ArrayList<>();
    //2)定义方法从集合中取数据
    public static void subtract(){
        synchronized (list) {
            // if (list.size() == 0) {
            while (list.size() == 0) {
                try {
                    System.out.println(Thread.currentThread().getName() + " begin wait....");
                    list.wait(); //等待
                    System.out.println(Thread.currentThread().getName() + " end wait..");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Object data = list.remove(0); //从集合中取出一个数据
            System.out.println( Thread.currentThread().getName() + "从集合中取了" + data +
                    "后,集合中数据的数量:" + list.size());
        }
    }
    //3)定义方法向集合中添加数据后,通知等待的线程取数据
    public static void add(){
        synchronized (list){
            list.add("data");
            System.out.println( Thread.currentThread().getName() + "存储了一个数据");
            list.notifyAll();
        }
    }
    //4)定义线程类调用 add()取数据的方法
    static class ThreadAdd extends Thread{
        @Override
        public void run() {
            add();
        }
    }
    //定义线程类调用 subtract()方法
    static class ThreadSubtract extends Thread{
        @Override
        public void run() {
            subtract();
        }
    }
}

1.1.8 生产者消费者模式

在 Java 中,负责产生数据的模块是生产者,负责使用数据的模块是消费者. 生产者消费者解决数据的平衡问题,即先有数据然后才能使用,没有数据时,消费者需要等待

1 生产-消费:操作值
package com.sevattal.producerdata;
/**
 * 定义一个操作数据的类
 * sevattal
 */
public class ValueOP {
    private String value = "";
    //定义方法修改 value 字段的值
    public void setValue(){
        synchronized ( this ){
            //如果 value 值不是""空串就等待
            while ( !value.equalsIgnoreCase("")){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //如果 value 字段值是容串, 就设置 value 字段的值
            String value = System.currentTimeMillis() + " - " + System.nanoTime();
            System.out.println("set 设置的值是: " + value);
            this.value = value;
            // this.notify(); //在多生产者多消费者环境中,notify()不能保证是生产者唤醒消费者,如果生产者唤醒的还是生产者可能会出现假死的情况
            this.notifyAll();
        }
    }
    //定义方法读取字段值
    public void getValue(){
        synchronized (this){
            //如果 value 是空串就等待
            while ( value.equalsIgnoreCase("")){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //不是空串,读取 字段值
            System.out.println("get 的值是: " + this.value);
            this.value = "";
            this.notifyAll();
        }
    }
}

package com.sevattal.producerdata;
/**
 * 定义线程类模拟生产者
 * sevattal
 */
public class ProducerThread extends Thread {
    //生产者生产数据就是调用 ValueOP 类的 setValue 方法给 value 字段赋值
    private ValueOP obj;
    public ProducerThread(ValueOP obj) {
        this.obj = obj;
    }
    @Override
    public void run() {
        while (true){
            obj.setValue();
        }
    }
}

package com.sevattal.producerdata;
/**
 * 定义线程类模拟消费者
 * sevattal
 */
public class ConsumerThread extends Thread {
    //消费者使用数据, 就是使用 ValueOP 类的 value 字段值
    private ValueOP obj;
    public ConsumerThread(ValueOP obj) {
        this.obj = obj;
    }
    @Override
    public void run() {
        while (true){
            obj.getValue();
        }
    }
}

package com.sevattal.producerdata;
/**
 * 测试多生产,多消费的情况
 * sevattal
 */
public class Test2 {
    public static void main(String[] args) {
        ValueOP valueOP = new ValueOP();
        ProducerThread p1 = new ProducerThread(valueOP);
        ProducerThread p2 = new ProducerThread(valueOP);
        ProducerThread p3 = new ProducerThread(valueOP);
        ConsumerThread c1 = new ConsumerThread(valueOP);
        ConsumerThread c2 = new ConsumerThread(valueOP);
        ConsumerThread c3 = new ConsumerThread(valueOP);
        p1.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
}
2 操作栈

使生产者把数据存储到 List 集合中, 消费者从 List 集合中取数据, 使用 List 集合模拟栈.

package com.sevattal.produerstack;
import java.util.ArrayList;
import java.util.List;
/**
 * 模拟栈
 * sevattal
 */
public class MyStack {
    private List list = new ArrayList(); //定义集合模拟栈
    private static final int MAX = 3; //集合的最大容量
    //定义方法模拟入栈
    public synchronized void push(){
        //当栈中的数据已满 就等待
        while ( list.size() >= MAX ){
            System.out.println(Thread.currentThread().getName() + " begin wait....");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        String data = "data--" + Math.random();
        System.out.println( Thread.currentThread().getName() + "添加了数据: " + data);
        list.add(data);
        // this.notify(); //当多个生产者多个消费者时,使用 notify()可能会出现假死的情况
        this.notifyAll();
    }
    //定义方法模拟出栈
    public synchronized void pop(){
        //如果没有数据就等待
        while ( list.size() == 0 ){
            try {
                System.out.println(Thread.currentThread().getName() + " begin wait....");
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println( Thread.currentThread().getName() + "出栈数据:" + list.remove(0) );
        this.notifyAll();
    }
}

package com.sevattal.produerstack;
/**
 * 生产者线程
 * sevattal
 */
public class ProduerThread extends Thread {
    private MyStack stack;
    public ProduerThread(MyStack stack) {
        this.stack = stack;
    }
    @Override
    public void run() {
        while (true){
            stack.push();
        }
    }
}

package com.sevattal.produerstack;
/**
 * 消费线程
 * sevattal
 */
public class ConsumerThread extends Thread {
    private MyStack stack;
    public ConsumerThread(MyStack stack) {
        this.stack = stack;
    }
    @Override
    public void run() {
        while (true){
            stack.pop();
        }
    }
}

package com.sevattal.produerstack;
/**
 * 测试多生产多消费的情况
 * sevattal
 */
public class Test02 {
    public static void main(String[] args) {
        MyStack stack = new MyStack();
        ProduerThread p = new ProduerThread(stack);
        ProduerThread p2 = new ProduerThread(stack);
        ProduerThread p3 = new ProduerThread(stack);
        ConsumerThread c1 = new ConsumerThread(stack);
        ConsumerThread c2 = new ConsumerThread(stack);
        ConsumerThread c3 = new ConsumerThread(stack);
        p.setName("生产者 1 号");
        p2.setName("生产者 2 号");
        p3.setName("生产者 3 号");
        c1.setName("消费者 1 号");
        c2.setName("消费者 2 号");
        c3.setName("消费者 3 号");
        p.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
}

1.4 ThreadLocal的使用

除了控制资源的访问外, 还可以通过增加资源来保证线程安全. ThreadLocal 主要解决为每个线程绑定自己的值.

package com.sevattal.threadlocal;
/**
 * ThreadLocal 的基本使用
 */
public class Test01 {
    //定义 ThreadLocal 对象
    static ThreadLocal threadLocal = new ThreadLocal();
    //定义线程类
    static class Subthread extends Thread{
        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                //设置线程关联的的值
                threadLocal.set( Thread.currentThread().getName() + " - " + i);
                //调用 get()方法读取关联的值
                System.out.println(Thread.currentThread().getName() + " value = " +
                        threadLocal.get());
            }
        }
    }
    public static void main(String[] args) {
        Subthread t1 = new Subthread();
        Subthread t2 = new Subthread();
        t1.start();
        t2.start();
    }
}

package com.sevattal.threadlocal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
 * 在多线程环境中,把字符串转换为日期对象,多个线程使用同一个 SimpleDateFormat 对象
 * 可能会产生线程安全问题,有异常
 * 为每个线程指定自己的 SimpleDateFormat 对象, 使用 ThreadLocal */
public class Test02 {
    //定义 SimpleDateFormat 对象,该对象可以把字符串转换为日期
    // private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy 年 MM 月 dd 日 HH:mm:ss");
    static ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<>();
    //定义 Runnable 接口的实现类
    static class ParseDate implements Runnable{
        private int i = 0 ;
        public ParseDate(int i) {
            this.i = i;
        }
        @Override
        public void run() {
            try {
                String text = "2068 年 11 月 22 日 08:28:" + i%60; //构建日期字符串
                // Date date = sdf.parse(text); //把字符串转换为日期
                //先判断当前线程是否有 SimpleDateFormat 对象,如果当前线程没有 SimpleDateFormat 对象就创建一个,如果有就直接使用
                if (threadLocal.get() == null){
                    threadLocal.set(new SimpleDateFormat("yyyy 年 MM 月 dd 日 HH:mm:ss"));
                }
                Date date = threadLocal.get().parse(text);
                System.out.println(i + " -- " + date);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        //创建 100 个线程
        for (int i = 0; i < 100; i++) {
            new Thread(new ParseDate(i)).start();
        }
    }
}

package com.sevattal.threadlocal;
import java.util.Date;
import java.util.Random;
/**
 * ThreadLocal 初始值, 定义 ThreadLocal 类的子类,在子类中重写 initialValue()方法指定初始
 * 值,再第一次调用 get()方法不会返回 null 
 **/
public class Test03 {
    //1) 定义 ThreadLocal 的子类
    static class SubThreadLocal extends ThreadLocal<Date>{
        // 重写 initialValue 方法,设置初始值
        @Override
        protected Date initialValue() {
            // return new Date(); //把当前日期设置为初始化
            return new Date(System.currentTimeMillis() - 1000*60*15);
        }
    }
    //定义 ThreadLocal 对象
    // static ThreadLocal threadLocal = new ThreadLocal();
    //直接使用自定义的 SubThreadLocal 对象
    static SubThreadLocal threadLocal = new SubThreadLocal();
    //定义线程类
    static class SubThread extends Thread{
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                //第一次调用 threadLocal 的 get()方法会返回 null
                System.out.println("---------" + Thread.currentThread().getName() + " value=" + threadLocal .get());
                //如果没有初始值就设置当前日期
                if ( threadLocal.get() == null ){
                    System.out.println("*****************");
                    threadLocal.set(new Date());
                }
                try {
                    Thread.sleep(new Random().nextInt(500));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {
        SubThread t1 = new SubThread();
        t1.start();
        SubThread t2 = new SubThread();
        t2.start();
    }
}
Contents
  1. 1. Java 线程基础 4
    1. 1.1. 1 线程间的通信
      1. 1.1.1. 1.1 等待/通知机制
        1. 1.1.1.1. 1.1.1 什么是等待通知机制
        2. 1.1.1.2. 1.1.2 等待/通知机制的实现
        3. 1.1.1.3. 1.1.3 interrupt()方法会中断 wait()
        4. 1.1.1.4. 1.1.4 notify()与 notifyAll()
        5. 1.1.1.5. 1.1.5 wait(long)的使用
        6. 1.1.1.6. 1.1.6 通知过早
        7. 1.1.1.7. 1.1.7 wait 等待条件发生了变化
        8. 1.1.1.8. 1.1.8 生产者消费者模式
          1. 1.1.1.8.1. 1 生产-消费:操作值
          2. 1.1.1.8.2. 2 操作栈
      2. 1.1.2. 1.4 ThreadLocal的使用
|