Java 线程基础 6
1 线程管理
1.1 线程组
类似于在计算机中使用文件夹管理文件,也可以使用线程组来管理线程. 在线程组中定义一组相似(相关)的线程,在线程组中也可以定义子线程组
Thread 类有几个构造方法允许在创建线程时指定线程组,如果在创建线程时没有指定线程组则该线程就属于父线程所在的线程组. JVM 在创建 main 线程时会为它指定一个线程组,因此每个 Java 线程都有一个线程组与之关联, 可以调用线程的 getThreadGroup()方法返回线程组.
线程组开始是出于安全的考虑设计用来 区分不同的 Applet,然而ThreadGroup 并未实现这一目标,在新开发的系统中,已经不常用线程组, 现在一般会将一组相关的线程存入一个数组或一个集合中,如果仅仅是用来区分线程时,可以使用线程名称来区分, 多数情况下,可以忽略线程组
1.1.1 创建线程组
package com.sevattal.threadgroup;
/**
* 演示创建线程组
*/
public class Test01 {
public static void main(String[] args) {
// 1) 返回当前 main 线程的线程组
ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();
System.out.println(mainGroup);
//2) 定义线程组,如果不指定所属线程组,则自动归属当前线程所属的线程组中
ThreadGroup group1 = new ThreadGroup("group1");
System.out.println(group1);
//3)定义线程组, 同时指定父线程组
ThreadGroup group2 = new ThreadGroup(mainGroup, "group2");
//现在 group1 与 group2 都是 maingroup 线程组中的子线程组, 调用线程组的 getParent()方法返回父线程组
System.out.println( group1.getParent() == mainGroup);
//true
System.out.println( group2.getParent() == mainGroup);
//4) 在创建线程时指定所属线程组
Runnable r = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread());
}
};
//在创建线程时,如果没有指定线程组,则默认线程归属到父线程的线程组中
//在 main 线程中创建了 t1 线程,称 main 线程为父线程,t1 线程为子线程, t1 没有指定线程组则 t1 线程就归属到父线程 main 线程的线程组中
Thread t1 = new Thread(r, "t1");
System.out.println( t1 ); // Thread[t1,5,main], t1 的线程组是 main 线程组
//创建线程时,可以指定线程所属线程组
Thread t2 = new Thread(group1, r, "t2");
Thread t3 = new Thread(group2, r, "t3");
System.out.println(t2);
System.out.println(t3);
}
}
1.1.2 线程组的基本操作
activeCount() 返回当前线程组及子线程组中活动线程的数量(近似值)
activeGroupCount() 返回当前线程组及子线程组中活动线程组的数量(近似值)
int enumerate(Thread[] list) 将当前线程组中的活动线程复制到参数数组中
enumerate(ThreadGroup[] list) 将当前线程组中的活动线程组复制到参数数组中
getMaxPriority() 返回线程组的最大优先级,默认是 10
getName() 返回线程组的名称
getParent() 返回父线程组
interrupt() 中断线程组中所有的线程
isDaemon() 判断当前线程组是否为守护线程组
list() 将当前线程组中的活动线程打印出来
parentOf(ThreadGroup g) 判断当前线程组是否为参数线程组的父线程组
setDaemon(boolean daemon) 设置线程组为守护线程组
package com.sevattal.threadgroup;
/**
* 演示线程组的基本操作
*/
public class Test02 {
public static void main(String[] args) {
ThreadGroup mainGroup = Thread.currentThread().getThreadGroup(); //返回当前线程组
//再定义线程组
ThreadGroup group = new ThreadGroup("group"); //默认 group 的父线程组是 main线程组
Runnable r = new Runnable() {
@Override
public void run() {
while (true){
System.out.println("-----------当前线程: " + Thread.currentThread());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread t1 = new Thread(r, "t1"); //默认在 main 线程组中创建线程
Thread t2 = new Thread(group, r, "t2"); //在指定的 group 线程组中创建线程
t1.start();
t2.start();
//打印线程组的相关属性
System.out.println("main 线 程组 中活 动线 程数 量: " + mainGroup.activeCount());
//4, main 线程组中活动线程: main, t1, t2, 垃圾回收器
System.out.println("group 子 线 程 组 中 活 动 线 程 数 量 : " + group.activeCount());
//1, t2
System.out.println("main 线 程 组 中 子 线 程 组 数 量 : " +
mainGroup.activeGroupCount()); //1, group
System.out.println("group 子线程组中子线程组数量: " + group.activeGroupCount());
//0
System.out.println("main 线程组的父线程组: " + mainGroup.getParent()); //main线程组的父线程组是 system
System.out.println("group 线程组的父线程组: " + group.getParent()); //main
System.out.println( mainGroup.parentOf(mainGroup)); //true, 线程组也是它自己的父线程组
System.out.println( mainGroup.parentOf(group)); //true
mainGroup.list(); //把 main 线程组中所有的线程打印输出
}
}
1.1.3 复制线程组中的线程及子线程组
enumerate(Thread[] list) 把当前线程组和子线程组中所有的线程复制到参数数组中
enumerate(Thread[] list, boolean recursive) , 如果第二个参数设置为 false,则只复制当前线程组中所有的线程,不复制子线程组中的线程
enumerate(ThreadGroup[] list) 把当前线程组和子线程组中所有的线程组复制到参数数组中
enumerate(ThreadGroup[] list, boolean recurse) 第二个参数设置false,则只复制当前线程组的子线程组
package com.sevattal.threadgroup;
/**
* 演示复制线程组中的内容
*/
public class Test03 {
public static void main(String[] args) {
ThreadGroup mainGroup = Thread.currentThread().getThreadGroup(); //返回 main线程的 main 线程组
//main 线程组中定义了两个子线程组
ThreadGroup group1 = new ThreadGroup("group1"); //默认 group1 的父线程组就是当前线程组 main
ThreadGroup group2 = new ThreadGroup(mainGroup, "group2");
Runnable r = new Runnable() {
@Override
public void run() {
while (true){
System.out.println("----当前线程: " + Thread.currentThread());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
//创建并启动三个线程
Thread t1 = new Thread(r, "t1"); //默认在 main 线程组中创建线程
Thread t2 = new Thread(group1, r, "t2"); //在 group1 线程组中创建线程
Thread t3 = new Thread(group2, r, "t3"); //在 group2 线程组中创建线程
t1.start();
t2.start();
t3.start();
//1) 把 main 线程组中的线程复制到数组中
//先定义存储线程的数组,数组的长度为 main 线程组中活动线程的数量
Thread[] threadList = new Thread[mainGroup.activeCount()];
/* //把 main 线程组包括子线程组中的所有的线程复制到数组中
mainGroup.enumerate(threadList);
//遍历 threadList 数组
for (Thread thread : threadList) {
System.out.println(thread);
}
System.out.println("----------------------------");
*/
//只把 main 线程组中的线程复制到数组中,不包含子线程组的线程
mainGroup.enumerate(threadList, false);
//遍历 threadList 数组
for (Thread thread : threadList) {
System.out.println(thread);
}
System.out.println("----------------------------");
//2) 把 main 线程组中的子线程组复制到数组中
//定义数组存储线程组
ThreadGroup [] threadGroups = new ThreadGroup[mainGroup.activeGroupCount()];
//把 main 线程组中的子线程组复制到数组中
mainGroup.enumerate(threadGroups);
System.out.println("============================");
for (ThreadGroup threadGroup : threadGroups) {
System.out.println(threadGroup);
}
}
}
1.1.4 线程组的批量中断
线程组的 interrupt() 可以给该线程组中所有的活动线程添加中断标志
package com.sevattal.threadgroup;
/**
* 线程组的批量中断
*/
public class Test04 {
public static void main(String[] args) throws InterruptedException {
Runnable r = new Runnable() {
@Override
public void run() {
System.out.println("当前线程--" + Thread.currentThread() + "--开始循环");
//当线程没有被中断就一直循环
while ( !Thread.currentThread().isInterrupted()){
System.out.println(Thread.currentThread().getName() +
"------------------");
/* try {
Thread.sleep(500);
} catch (InterruptedException e) {
//如果中断睡眠中的线程,产生中断异常, 同时会清除中断标志
e.printStackTrace();
}*/
}
System.out.println(Thread.currentThread().getName() + "循环结束");
}
};
//创建线程组
ThreadGroup group = new ThreadGroup("group");
//在 group 线程组中创建 5 个线程
for (int i = 0; i < 5; i++) {
new Thread(group,r).start();
}
//main 线程睡眠 2 秒
Thread.sleep(50);
//中断线程组, 会中断线程组中所有的线程
group.interrupt();
}
}
1.1.5 设置守护线程组
守护线程是为其他线程提供服务的,当 JVM 中只有守护线程时,守护线程会自动销毁,JVM 会退出.
调用线程组的 setDaemon(true)可以把线程组设置为守护线程组, 当守护线程组中没有任何活动线程时,守护线程组会自动销毁.
注意线程组的守护属性,不影响线程组中线程的守护属性,或者说守护线程组中的线程可以是非守护线程
package com.sevattal.threadgroup;
/**
* 演示设置守护线程组
*/
public class Test05 {
public static void main(String[] args) throws InterruptedException {
//先定义线程组
ThreadGroup group = new ThreadGroup("group");
//设置线程组为守护线程组
group.setDaemon(true);
//向组中添加 3 个线程
for (int i = 0; i < 3; i++) {
new Thread(group, new Runnable() {
@Override
public void run() {
for (int j = 0; j < 20; j++) {
System.out.println(Thread.currentThread().getName() + " -- " + j);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
//main 线程睡眠 5 秒
Thread.sleep(5000);
System.out.println("main...end....");
}
}
1.2 捕获线程的执行异常
在线程的run方法中,如果有受检异常必须进行捕获处理,如果想要获 得 run() 方 法 中 出 现 的 运 行 时 异 常 信 息 , 可 以 通 过 回 调UncaughtExceptionHandler 接口获得哪个线程出现了运行时异常.在Thread 类中有关处理运行异常的方法有:
getDefaultUncaughtExceptionHandler() 获 得 全 局 的(默 认的)UncaughtExceptionHandler
getUncaughtExceptionHandler() 获 得 当 前 线 程 的UncaughtExceptionHandler
setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) 设置全局的 UncaughtExceptionHandler
setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)设置当前线程的 UncaughtExceptionHandler
当线程运行过程中出现异常 ,JVM 会调用 Thread 类的 dispatchUncaughtException(Throwable e) 方法 , 该方法会调用 getUncaughtExceptionHandler().uncaughtException(this, e); 如果想要获得线程中出现异常的信息 , 就需要设置线程的UncaughtExceptionHandler
package com.sevattal.threadexception;
import java.io.FileInputStream;
/**
* 演示设置线程的 UnCaughtExceptionHandler 回调接口
*/
public class Test01 {
public static void main(String[] args) {
//1)设置线程全局的回调接口
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
{
@Override
public void uncaughtException(Thread t, Throwable e) {
//t 参数接收发生异常的线程, e 就是该线程中的异常
System.out.println(t.getName() + "线程产生了异常: " + e.getMessage());
}
});
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "开始运行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
//线程中的受检异常必须捕获处理
e.printStackTrace();
}
System.out.println(12 / 0 ); //会产生算术异常
}
});
t1.start();
new Thread(new Runnable() {
@Override
public void run() {
String txt = null;
System.out.println( txt.length()); //会产生空指针异常
}
}).start();
/* 在实际开发中,这种设计异常处理的方式还是比较常用的,尤其是异常执行的方法
如果线程产生了异常, JVM 会调用 dispatchUncaughtException()方法,在该方法
中调用了 getUncaughtExceptionHandler().uncaughtException(this, e); 如果当前线程设置了
UncaughtExceptionHandler 回调接口就直接调用它自己的 uncaughtException 方法, 如果没有
设置则调用当前线程所在线程组 UncaughtExceptionHandler 回调接口的 uncaughtException 方
法,如果线程组也没有设置回调接口,则直接把异常的栈信息定向到 System.err 中
*/
}
}
1.3 注入 Hook 钩子线程
现在很多软件包括 MySQL, Zookeeper, kafka 等都存在 Hook 线程的校验机制, 目的是校验进程是否已启动,防止重复启动程序.
Hook 线程也称为钩子线程, 当 JVM 退出的时候会执行 Hook 线程. 经常在程序启动时创建一个.lock 文件, 用.lock 文件校验程序是否启动,在程序退出(JVM 退出)时删除该.lock 文件, 在 Hook 线程中除了防止重新启动进程外,还可以做资源释放, 尽量避免在 Hook 线程中进行
package com.sevattal.hook;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
/**
* 通过 Hook 线程防止程序重复启动
*/
public class Test {
public static void main(String[] args) {
//1)注入 Hook 线程,在程序退出时删除.lock 文件
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
System.out.println("JVM 退出,会启动当前 Hook 线程,在 Hook 线程中删 除.lock 文件");
getLockFile().toFile().delete();
}
});
//2)程序运行时,检查 lock 文件是否存在,如果 lock 文件存在,则抛出异常
if ( getLockFile().toFile().exists()){
throw new RuntimeException("程序已启动");
}else { //文件不存在,说明程序是第一次启动,创建 lock 文件
try {
getLockFile().toFile().createNewFile();
System.out.println("程序在启动时创建了 lock 文件");
} catch (IOException e) {
e.printStackTrace();
}
}
//模拟程序运行
for (int i = 0; i < 10; i++) {
System.out.println("程序正在运行");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static Path getLockFile(){
return Paths.get("", "tmp.lock");
}
}
1.4 线程池
1.4.1 什么是线程池
可以以 new Thread( () -> { 线程执行的任务 }).start(); 这种形式开启一个线程. 当 run()方法运行结束,线程对象会被 GC 释放.
在真实的生产环境中,可能需要很多线程来支撑整个应用,当线程数量非常多时 ,反而会耗尽 CPU 资源. 如果不对线程进行控制与管理, 反而会影响程序的性能. 线程开销主要包括: 创建与启动线程的开销;线程销毁开销; 线程调度的开销; 线程数量受限 CPU 处理器数量. 线程池就是有效使用线程的一种常用方式.
线程池内部可以预先创建一定数量的工作线程,客户端代码直接将任务作为一个对象提交给线程池, 线程池将这些任务缓存在工作队列中, 线程池中的工作线程不断地从队列中取出任务并执行.
1.4.2 JDK 对线程池的支持
JDK 提供了一套 Executor 框架,可以帮助开发人员有效的使用线程池
package com.sevattal.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 线程池的基本使用
*/
public class Test01 {
public static void main(String[] args) {
//创建有 5 个线程大小的线程池,
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
//向线程池中提交 18 个任务,这 18 个任务存储到线程池的阻塞队列中, 线程池中这5 个线程就从阻塞队列中取任务执行
for (int i = 0; i < 18; i++) {
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getId() + " 编号的任务在执 行任务,开始时间: " + System.currentTimeMillis());
try {
Thread.sleep(3000); //模拟任务执行时长
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
package com.sevattal.threadpool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 线程池的计划任务
*/
public class Test02 {
public static void main(String[] args) {
//创建一个有调度功能的线程池
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(10);
//在延迟 2 秒后执行任务, schedule( Runnable 任务, 延迟时长, 时间单位)
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getId() + " -- " +
System.currentTimeMillis() );
}
}, 2, TimeUnit.SECONDS);
//以固定的频率执行任务,开启任务的时间是固定的, 在 3 秒后执行任务,以后每隔 5秒重新执行一次
/* scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getId() + "----在固定频率开启任
务---" + System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(3); //睡眠模拟任务执行时间 ,如果任务执
行时长超过了时间间隔,则任务完成后立即开启下个任务
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 3, 2, TimeUnit.SECONDS);*/
//在上次任务结束后,在固定延迟后再次执行该任务,不管执行任务耗时多长,总是在任务结束后的 2 秒再次开启新的任务
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getId() + "----在固定频率开启任 务---" + System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(3); //睡眠模拟任务执行时间 ,如果任务执行时长超过了时间间隔,则任务完成后立即开启下个任务
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 3, 2, TimeUnit.SECONDS);
}
}
1.4.3 核心线程池的底层实现
查看 Executors 工具类中 newCachedThreadPool(), newSingleThreadExcecutor(), newFixedThreadPool()源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
该线程池在极端情况下,每次提交新的任务都会创建新的线程执行. 适合用来执行大量耗时短并且提交频繁的任务
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
Excutors 工 具 类 中 返 回 线 程 池 的 方 法 底 层 都 使 用 了ThreadPoolExecutor 线程池,这些方法都是 ThreadPoolExecutor 线程池的封装
ThreadPoolExecutor 的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
各个参数含义:
corePoolSize, 指定线程池中核心线程的数量
maxinumPoolSize,指定线程池中最大线程数量
keepAliveTime,当线程池线程的数量超过 corePoolSize 时,多余的空闲线程的存活时长,即空闲线程在多长时长内销毁unit, 是 keepAliveTime 时长单位
workQueue,任务队列,把任务提交到该任务队列中等待执行
threadFactory,线程工厂,用于创建线程
handler 拒绝策略,当任务太多来不及处理时,如何拒绝
说明:
workQueue 工作队列是指提交未执行的任务队列 , 它 是BlockingQueue 接口的对象,仅用于存储 Runnable 任务.根据队列功能分类,在 ThreadPoolExecutor 构造方法中可以使用以下几种阻塞队列:
直接提交队列,由 SynchronousQueue 对象提供,该队列没有容量,提交给线程池的任务不会被真实的保存,总是将新的任务提交给线程执行,如果没有空闲线程,则尝试创建新的线程,如果线程数量已经达到 maxinumPoolSize 规定的最大值则执行拒绝策略.
有 界 任 务 队 列 , 由 ArrayBlockingQueue 实 现 , 在 创 建ArrayBlockingQueue 对象时,可以指定一个容量. 当有任务需要执行时,如果线程池中线程数小于 corePoolSize 核心线程数则创建新的线程;如果大于 corePoolSize 核心线程数则加入等待队列.如果队列已满则无法加入,在线程数小于 maxinumPoolSize 指定的最大线程 数 前 提 下 会 创 建 新 的 线 程 来 执 行 , 如 果 线 程 数 大 于 maxinumPoolSize 最大线程数则执行拒绝策略
无界任务队列,由 LinkedBlockingQueue 对象实现,与有界队列相比,除非系统资源耗尽,否则无界队列不存在任务入队失败的情况. 当有新的任务时,在系统线程数小于 corePoolSize 核心线程数则创建新的线程来执行任务;当线程池中线程数量大于corePoolSize 核心线程数则把任务加入阻塞队列
优先任务队列是通过 PriorityBlockingQueue 实现的,是带有任务优先级的队列 , 是一个特殊的无界队列 . 不管 是ArrayBlockingQueue 队列还是 LinkedBlockingQueue 队列都是按照先进先出算法处理任务的.在 PriorityBlockingQueue 队列中可以根据任务优先级顺序先后执行.
1.4.4 拒绝策略
ThreadPoolExecutor 构造方法的最后一个参数指定了拒绝策略.当提交给线程池的任务量超过实际承载能力时,如何处理? 即线程池中
的线程已经用完了,等待队列也满了,无法为新提交的任务服务,可以通过拒绝策略来处理这个问题. JDK 提供了四种拒绝策略:
AbortPolicy 策略,会抛出异常
CallerRunsPolicy 策略,只要线程池没关闭,会在调用者线程中运行当前被丢弃的任务
DiscardOldestPolicy 将任务队列中最老的任务丢弃,尝试再次提交新任务
DiscardPolicy 直接丢弃这个无法处理的任务
Executors 工具类提供的静态方法返回的线程池默认的拒绝策略是AbortPolicy 抛出异常,如果内置的拒绝策略无法满足实际需求,可以扩展 RejectedExecutionHandler 接口
package com.sevattal.threadpool;
import java.util.Random;
import java.util.concurrent.*;
/**
* 自定义拒绝策略
*/
public class Test03 {
public static void main(String[] args) {
//定义任务
Runnable r = new Runnable() {
@Override
public void run() {
int num = new Random().nextInt(5);
System.out.println(Thread.currentThread().getId() + "--" +
System.currentTimeMillis() + "开始睡眠" + num + "秒");
try {
TimeUnit.SECONDS.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//创建线程池, 自定义拒绝策略
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new
RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//r 就是请求的任务, executor 就是当前线程池
System.out.println(r + " is discarding..");
}
});
//向线程池提交若干任务
for (int i = 0; i < Integer.MAX_VALUE; i++) {
threadPoolExecutor.submit(r);
}
}
}
1.4.5 ThreadFactory
线程池中的线程从哪儿来的? 答案就是 ThreadFactory.
ThreadFactory 是一个接口,只有一个用来创建线程的方法:Thread newThread(Runnable r);
当线程池中需要创建线程时就会调用该方法
package com.sevattal.threadpool;
import java.util.Random;
import java.util.concurrent.*;
/**
* 自定义线程工厂
*/
public class Test04 {
public static void main(String[] args) throws InterruptedException {
//定义任务
Runnable r = new Runnable() {
@Override
public void run() {
int num = new Random().nextInt(10);
System.out.println(Thread.currentThread().getId() + "--" +
System.currentTimeMillis() + "开始睡眠:" + num + "秒");
try {
TimeUnit.SECONDS.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//创建线程池, 使用自定义线程工厂, 采用默认的拒绝策略是抛出异常
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
//根据参数 r 接收的任务,创建一个线程
Thread t = new Thread( r );
t.setDaemon(true); //设置为守护线程, 当主线程运行结束,线程池中的线程会自动退出
System.out.println("创建了线程: " + t);
return t ;
}
});
//提交 5 个任务, 当给当前线程池提交的任务超过 5 个时,线程池默认抛出异常
for (int i = 0; i < 5; i++) {
executorService.submit(r);
}
//主线程睡眠
Thread.sleep(10000);
//主线程睡眠超时, 主线程结束, 线程池中的线程会自动退出
}
}
1.4.6 监控线程池
ThreadPoolExecutor 提供了一组方法用于监控线程池
int getActiveCount() 获得线程池中当前活动线程的数量
long getCompletedTaskCount() 返回线程池完成任务的数量
int getCorePoolSize() 线程池中核心线程的数量
int getLargestPoolSize() 返回线程池曾经达到的线程的最大数
int getMaximumPoolSize() 返回线程池的最大容量
int getPoolSize() 当前线程池的大小
BlockingQueue
long getTaskCount() 返回线程池收到的任务总数
package com.sevattal.threadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 监控线程池
*/
public class Test05 {
public static void main(String[] args) throws InterruptedException {
//先定义任务
Runnable r = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getId()
+ " 编号 的线程开始执 行: " + System.currentTimeMillis());
try {
Thread.sleep(10000); //线程睡眠 20 秒,模拟任务执行时长
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//定义线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new
ThreadPoolExecutor.DiscardPolicy());
//向线程池提交 30 个任务
for (int i = 0; i < 30; i++) {
poolExecutor.submit(r);
System.out.println(" 当 前 线 程 池 核 心 线 程 数 量 : "
+ poolExecutor.getCorePoolSize() + ", 最大线程数:"
+ poolExecutor.getMaximumPoolSize() + ",当前线程池大小 :"
+ poolExecutor.getPoolSize() + ", 活 动 线 程 数 量 :" +
poolExecutor.getActiveCount()+ ",收到任务数量:" + poolExecutor.getTaskCount()
+ ",完成任务数 : " + poolExecutor.getCompletedTaskCount() + ", 等 待 任 务 数 :"
+ poolExecutor.getQueue().size()) ;
TimeUnit.MILLISECONDS.sleep(500);
}
System.out.println("-----------------------------------------------");
while ( poolExecutor.getActiveCount() >= 0 ){
System.out.println(" 当 前 线 程 池 核 心 线 程 数 量 : "
+ poolExecutor.getCorePoolSize() + ", 最大线程数:" + poolExecutor.getMaximumPoolSize()
+ ",当前线程池大小 :" + poolExecutor.getPoolSize() + ", 活 动 线 程 数 量 :" +
poolExecutor.getActiveCount()+ ",收到任务数量:" +
poolExecutor.getTaskCount() + ",完成任务数 : " +
poolExecutor.getCompletedTaskCount() + ", 等 待 任 务 数 :" +
poolExecutor.getQueue().size()) ;
Thread.sleep(1000);
}
}
}
1.4.7 扩展线程池
有时需要对线程池进行扩展,如在监控每个任务的开始和结束时间,或者自定义一些其他增强的功能.
ThreadPoolExecutor 线程池提供了两个方法:
protected void afterExecute(Runnable r, Throwable t)
protected void beforeExecute(Thread t, Runnable r)
在线程池执行某个任务前会调用 beforeExecute()方法,在任务结束后(任务异常退出)会执行 afterExecute()方法
查看 ThreadPoolExecutor 源码,在该类中定义了一个内部类 Worker, ThreadPoolExecutor 线程池中的工作线程就是 Worker 类的实例, Worker 实例在执行时会调用 beforeExecute()与 afterExecute()方法
package com.sevattal.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 扩展线程池
*/
public class Test06 {
//定义任务类
private static class MyTask implements Runnable{
String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(name + "任务正在被线程 " + Thread.currentThread().getId() +
" 执行");
try {
Thread.sleep(1000); //模拟任务执行时长
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
//定义扩展线程池, 可以定义线程池类继承 ThreadPoolExecutor,在子类中重写beforeExecute()/afterExecute()方法
//也可以直接使用 ThreadPoolExecutor 的内部类
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>() ){
//在内部类中重写任务开始方法
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(t.getId() + "线程准备执行任务: " + ((MyTask)r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println( ((MyTask)r).name + "任务执行完毕");
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
//向线程池中添加任务
for (int i = 0; i < 5; i++) {
MyTask task = new MyTask("task-" + i);
executorService.execute(task);
}
//关闭线程池
executorService.shutdown(); //关闭线程池仅仅是说线程池不再接收新的任务 , 线程池中已接收的任务正常执行完毕
}
}
1.4.8 优化线程池大小
线程池大小对系统性能是有一定影响的,过大或者过小都会无法发挥最优的系统性能, 线程池大小不需要非常精确,只要避免极大或者极小的情况即可, 一般来说,线程池大小需要考虑 CPU 数量,内存大小等因素. 在
线程池大小 = CPU 的数量 * 目标 CPU 的使用率 * ( 1 + 等待时间与计算时间的比)
1.4.9 线程池死锁
如果在线程池中执行的 任务 A 在执行过程中又向线程池提交了任务 B, 任务 B 添加到了线程池的等待队列中, 如果任务 A 的结束需要等待任务 B 的执行结果. 就有可能会出现这种情况: 线程池中所有的工作线程都处于等待任务处理结果,而这些任务在阻塞队列中等待执行, 线程池中没有可以对阻塞队列中的任务进行处理的线程,这种等待会一直持续下去,从而造成死锁.
适合给线程池提交相互独立的任务,而不是彼此依赖的任务. 对于
彼此依赖的任务,可以考虑分别提交给不同的线程池来执行.
1.4.10 线程池中的异常处理
在使用 ThreadPoolExecutor 进行 submit 提交任务时,有的任务抛出了异常,但是线程池并没有进行提示,即线程池把任务中的异常给吃掉了,可以把 submit 提交改为 execute 执行,也可以对 ThreadPoolExecutor 线程池进行扩展.对提交的任务进行包装:
package com.sevattal.threadpool;
import java.util.concurrent.*;
/**
* 自定义线程池类,对 ThreadPoolExecutor 进行扩展
*/
public class Test08 {
//自定义线程池类
private static class TraceThreadPollExecutor extends ThreadPoolExecutor{
public TraceThreadPollExecutor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//定义方法,对执行的任务进行包装,接收两个参数,第一个参数接收要执行的任务, 第二个参数是一个 Exception 异常
public Runnable wrap( Runnable task, Exception exception){
return new Runnable() {
@Override
public void run() {
try {
task.run();
}catch (Exception e ){
exception.printStackTrace();
throw e;
}
}
};
}
//重写 submit 方法
@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task, new Exception("客户跟踪异常")));
}
@Override
public void execute(Runnable command) {
super.execute(wrap(command, new Exception("客户跟踪异常")));
}
}
//定义类实现 Runnable 接口,用于计算两个数相除
private static class DivideTask implements Runnable{
private int x;
private int y;
public DivideTask(int x, int y) {
this.x = x;
this.y = y;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "计算:" + x + " / " + y + " = " + (x/y));
}
}
public static void main(String[] args) {
//创建线程池
// ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<>())
//使用自定义的线程池
ThreadPoolExecutor poolExecutor = new TraceThreadPollExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
//向线程池中添加计算两个数相除的任务
for (int i = 0; i < 5; i++) {
poolExecutor.submit(new DivideTask(10, i));
// poolExecutor.execute(new DivideTask(10, i));
}
}
}
1.4.11 ForkJoinPool 线程池
“分而治之”是一个有效的处理大数据的方法,著名的 MapReduce就是采用这种分而治之的思路. 简单点说,如果要处理的 1000 个数据, 但是我们不具备处理1000个数据的能力,可以只处理10个数据, 可以把这 1000 个数据分阶段处理 100 次,每次处理 10 个,把 100 次的处理结果进行合成,形成最后这 1000 个数据的处理结果.
把一个大任务调用 fork()方法分解为若干小的任务,把小任务的处理结果进行 join()合并为大任务的结果
系统对 ForkJoinPool 线程池进行了优化,提交的任务数量与线程的数量不一定是一对一关系.在多数情况下,一个物理线程实际上需要处理多个逻辑任务.
ForkJoinPool 线程池中最常用 的方法是:
向线程池提交一个 ForkJoinTask 任务. ForkJoinTask 任务支持 fork()分解与 join()等待的 任 务 . ForkJoinTask 有 两 个 重 要 的 子 类 :RecursiveAction 和RecursiveTask ,它们的区别在于 RecursiveAction 任务没有返回值, RecursiveTask 任务可以带有返回值
package com.sevattal.threadpool;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* 演示 ForkJoinPool 线程池的使用
* 使用该线程池模拟数列求和
*/
public class Test09 {
//计算数列的和, 需要返回结果,可以定义任务继承 RecursiveTask
private static class CountTask extends RecursiveTask<Long>{
private static final int THRESHOLD = 10000; //定义数据规模的阈值,允许计算10000 个数内的和,超过该阈值的数列就需要分解
private static final int TASKNUM = 100; //定义每次把大任务分解为 100 个小任务
private long start; //计算数列的起始值
private long end; //计算数列的结束值
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
//重写 RecursiveTask 类的 compute()方法,计算数列的结果
@Override
protected Long compute() {
long sum = 0 ; //保存计算的结果
//判断任务是否需要继续分解,如果当前数列 end 与 start 范围的数超过阈值THRESHOLD,就需要继续分解
if ( end - start < THRESHOLD){
//小于阈值可以直接计算
for (long i = start ; i <= end; i++){
sum += i;
}
}else { //数列范围超过阈值,需要继续分解
//约定每次分解成 100 个小任务,计算每个任务的计算量
long step = (start + end ) / TASKNUM;
//start = 0 , end = 200000, step = 2000, 如果计算[0,200000]范围内数列的
//和, 把该范围的数列分解为 100 个小任务,每个任务计算 2000 个数即可
//注意,如果任务划分的层次很深,即 THRESHOLD 阈值太小,每个任务的计
//算量很小,层次划分就会很深,可能出现两种情况:一是系统内的线程数量会越积越多,导致性
//能下降严重; 二是分解次数过多,方法调用过多可能会导致栈溢出
//创建一个存储任务的集合
ArrayList<CountTask> subTaskList = new ArrayList<>();
long pos = start; //每个任务的起始位置
for (int i = 0; i < TASKNUM; i++) {
long lastOne = pos + step; //每个任务的结束位置
//调整最后一个任务的结束位置
if ( lastOne > end ){
lastOne = end;
}
//创建子任务
CountTask task = new CountTask(pos, lastOne);
//把任务添加到集合中
subTaskList.add(task);
//调用 for()提交子任务
task.fork();
//调整下个任务的起始位置
pos += step + 1;
}
//等待所有的子任务结束后,合并计算结果
for (CountTask task : subTaskList) {
sum += task.join(); //join()会一直等待子任务执行完毕返回执行结果
}
}
return sum;
}
}
public static void main(String[] args) {
//创建 ForkJoinPool 线程池
ForkJoinPool forkJoinPool = new ForkJoinPool();
//创建一个大的任务
CountTask task = new CountTask(0L, 200000L);
//把大任务提交给线程池
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
Long res = result.get(); //调用任务的 get()方法返回结果
System.out.println("计算数列结果为:" + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//验证
long s = 0L;
for (long i = 0; i <= 200000 ; i++) {
s += i;
}
System.out.println(s);
}
}