线程池有点像数据库连接池, 调用连接池的关闭方法, 实际上那个方法被重写了, 会将连接返回连接池.
线程池也是类似, 找到了一篇美团技术团队的文章, 详细解释了线程池.
简单的说, 如果是反复重复的任务需要并行, 由于自行编写并发程序很麻烦, 所以就可以交给线程池来进行操作.
- 线程池 ThreadPoolExecutor
- 设置线程池参数
- 扩展ThreadPoolExecutor
- 线程池的一些相关方法
线程池 ThreadPoolExecutor
这个类位于java.util.concurrent中, 包括其父类和子类. Executor类是线程池的基础, 实际使用的ThreadPoolExecutor就是继承这个类, 将Runnable类型的任务提交给线程池就可以被线程池执行.
创建线程池的工厂叫做Executors, 通过工厂类的静态方法可以创建不同类型的线程池, 都是ExecutorService类型的对象, ExecutorService继承自Executor, 常见的有如下:
Executors.newFixedThreadPool(int nThreads)
, 创建一个固定线程数量的线程池
Executors.newSingleThreadExecutor()
, 创建一个只有一个线程的线程池, 所有任务会保存在一个队列中依次执行
Executors.newCachedThreadPool()
, 返回自行控制的线程池, 也是比较常见的用法
Executors.newSingleThreadScheduledExecutor()
, 返回一个ScheduledExecutorService对象, 单线程, 可以指定定时任务
Executors.newScheduledThreadPool(int poolSize)
, 返回带有计划功能的, 可指定线程多少的线程池对象.
只要理解了Executors与Executor之间的关系, 一个是工厂, 一个是工厂创建出来的池子, 继承关系就容易理解了.
如果想要微调参数, 则就可以使用ThreadPoolExecutor的构造函数来进行微调. 因为返回的池子其实内部都是使用了ThreadPoolExecutor类.
让线程执行任务只需要调用.submit()方法即可, 方法接受Runnable和Callable对象.
写个简单小例子:
//一个线程类
public class MyThread implements Runnable {
@Override
public void run() {
System.out.println("Start at " + System.currentTimeMillis() + " " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolTest {
//创建一个4个大小的线程池
private static ExecutorService newPool = Executors.newFixedThreadPool(4);
//提交10个任务
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
newPool.submit(new MyThread());
}
newPool.shutdown();
}
}
可见线程一次提交4个, 如果改成其他种类, 会发现每次提交的任务数量会有变化.
注意最后的shutdown(), 如果不关闭线程池, 执行线程池的主线程不会退出.
也可以提交Callable对象, 不过目前还都是无法获取结果, 只能操作共享变量的Thread对象, 暂且先这么用着.
设置线程池参数
使用工厂方法, 基本上就是将策略交给了内置的策略. 如果想精确控制线程池的参数, 就可以在ThreadPoolExecutor的构造器中进行设置. 源码中参数最多的重载构造函数如下:
public ThreadPoolExecutor(int corePoolSize,//线程池中的线程数量
int maximumPoolSize,//线程池中的最大线程数量
long keepAliveTime,//线程数量超过corePoolSize之后, 线程的最大活动时间
TimeUnit unit,//上一个参数的单位
BlockingQueue<Runnable> workQueue,//阻塞任务队列, 所谓阻塞队列, 队列为空时候读取会阻塞, 队列满的时候添加任务会阻塞
ThreadFactory threadFactory, // 自定义的创建线程的工厂
RejectedExecutionHandler handler // 拒绝策略)
这其中BlockingQueue可以自行控制, 决定传入什么样子的队列, 比如直接提交队列, 有界任务队列, 无界任务队列, 优先任务队列等等, 都需要实现BlockingQueue接口.
ThreadFactory是如何创建线程工厂, 如果需要自定义每次线程如何创建, 或者在创建线程之前做一些事情, 就可以传入一个自定义的ThreadFactory对象.
拒绝策略是指当一些情况出现的时候, 要如何分配任务(拒绝任务也是任务分配的一种). JDK内置有四种策略, 都实现了RejectedExecutionHandler接口, 详细如下:
AbortPolicy
, 直接抛出异常, 也是默认的方式
CallerRunsPolicy
, 只要线程池没有关闭, 直接在调用者线程里运行任务
DiscardOldestPolicy
, 丢弃最老的一个请求, 也就是即将被执行的任务, 然后再次提交新任务
DiscardPolicy
, 丢弃新任务, 不做任何提示.
四种情况要根据实际情况进行判断. 也可以自定义RejectedExecutionHandler对象.
扩展ThreadPoolExecutor
ThreadPoolExecutor中有三个方法可以自定义:
beforeExecute()
afterExecute()
terminated()
前两个方法会在每个线程被执行的时候, 在线程开始和结束的时候调用, 最后一个方法, 则会在线程池结束的时候被调用.
尝试一下上边一节和这次的各种方法来自定义一些东西玩玩:
import java.util.concurrent.*;
public class CustomizeThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(5,
10, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() {
}, new ThreadFactory() {
//自定义ThreadFactory
@Override
public Thread newThread(Runnable r) {
System.out.println("创建一个新线程");
Thread t = new Thread(r);
System.out.println("创建新线程: " + t.getName() + " 成功");
return t;
}
}, new ThreadPoolExecutor.AbortPolicy()){
//自定义ThreadPoolExecutor的三个生命周期方法
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行 "+ r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完毕 "+ r);
}
@Override
protected void terminated() {
System.out.println("线程池运行结束.");
}
};
for (int i = 0; i < 10; i++) {
pool.submit(new MyThread());
}
pool.shutdown();
}
}
运行结果如下:
创建一个新线程
创建新线程: Thread-0 成功
创建一个新线程
创建新线程: Thread-1 成功
创建一个新线程
创建新线程: Thread-2 成功
创建一个新线程
创建新线程: Thread-3 成功
创建一个新线程
创建新线程: Thread-4 成功
准备执行 java.util.concurrent.FutureTask@66910c95[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@3b37d803[Wrapped task = multithread.ThreadPool.MyThread@44e1d57b]]
准备执行 java.util.concurrent.FutureTask@73bc7a9d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@38f343ca[Wrapped task = multithread.ThreadPool.MyThread@37876d91]]
准备执行 java.util.concurrent.FutureTask@7cfb39a5[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4a949d85[Wrapped task = multithread.ThreadPool.MyThread@64bb913d]]
准备执行 java.util.concurrent.FutureTask@7eaee205[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@31dd3324[Wrapped task = multithread.ThreadPool.MyThread@4395df41]]
准备执行 java.util.concurrent.FutureTask@4e8eae8a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@5825056d[Wrapped task = multithread.ThreadPool.MyThread@6458e989]]
Start at 1593584674017 Thread-1
Start at 1593584674017 Thread-2
Start at 1593584674016 Thread-0
Start at 1593584674017 Thread-3
Start at 1593584674017 Thread-4
执行完毕 java.util.concurrent.FutureTask@7cfb39a5[Completed normally]
准备执行 java.util.concurrent.FutureTask@524b5414[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@b0f7684[Wrapped task = multithread.ThreadPool.MyThread@dfad302]]
Start at 1593584675035 Thread-1
执行完毕 java.util.concurrent.FutureTask@66910c95[Completed normally]
执行完毕 java.util.concurrent.FutureTask@73bc7a9d[Completed normally]
准备执行 java.util.concurrent.FutureTask@3ce1e52a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4a606b02[Wrapped task = multithread.ThreadPool.MyThread@2065a844]]
Start at 1593584675036 Thread-0
准备执行 java.util.concurrent.FutureTask@e5f4084[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@76121ff2[Wrapped task = multithread.ThreadPool.MyThread@70aa8d0e]]
Start at 1593584675036 Thread-2
执行完毕 java.util.concurrent.FutureTask@4e8eae8a[Completed normally]
准备执行 java.util.concurrent.FutureTask@1830ccdb[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@296a1f26[Wrapped task = multithread.ThreadPool.MyThread@5ffa45c8]]
执行完毕 java.util.concurrent.FutureTask@7eaee205[Completed normally]
Start at 1593584675037 Thread-4
准备执行 java.util.concurrent.FutureTask@f3d475a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@44438c8f[Wrapped task = multithread.ThreadPool.MyThread@4f4db265]]
Start at 1593584675037 Thread-3
执行完毕 java.util.concurrent.FutureTask@524b5414[Completed normally]
执行完毕 java.util.concurrent.FutureTask@e5f4084[Completed normally]
执行完毕 java.util.concurrent.FutureTask@3ce1e52a[Completed normally]
执行完毕 java.util.concurrent.FutureTask@f3d475a[Completed normally]
执行完毕 java.util.concurrent.FutureTask@1830ccdb[Completed normally]
线程池运行结束.
可以看到, 虽然有10个任务, 但是只创建了5个线程, 然后在每次执行任务之前, 都会打印出相关信息. 最后在线程池结束的时候, terminated()被运行.
线程池的一些相关方法
这个关系到ThreadPoolExecutor的运行状态, 一共有5种:
RUNNING
, 能够接受新任务, 也在处理新任务
SHUTDOWN
, 不再接受新任务, 但会把遗留任务执行完毕
STOP
, 不接受新任务也不再处理尚未处理的任务, 同时中断正在处理的任务
TIDYING
, 所有任务都终止, 逐个干掉线程, 线程数量为0, 实际上就是清理阶段
TERMINATED
, 调用terminated()方法之后进入该状态, 其实就是彻底结束
所以要注意的是如下方法:
shutdown()
, 最常用的, 完成工作提交之后调用该方法, 转入SHUTDOWN状态, 线程池会在处理完毕之后结束
shutdownNow()
, 直接进入STOP状态, 相当于突然中止线程池的工作
isShutdown()
, 判断是不是在SHUTDOWN状态
isTerminated()
, 判断是不是TERMINATED
execute(Runnable command)
, 这个也是提交任务的方法, 但是不返回任何值
submit(Runnable task)
, 这个也是提交任务的方法, 返回Future对象, 这个就后边再看了.
SHUTDOWN和STOP之后, 在线程数量为0后, 都会进入TIDYING然后是TERMINATED阶段.
好了, 现在可以把线程池加入到自己的工具中来了.